From 48860aa7f1e5c2447ca1b0582f83e8eb078db123 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Wed, 14 Aug 2024 18:02:51 +0800 Subject: [PATCH 01/11] 1 --- .../org/apache/doris/mtmv/BaseTableInfo.java | 66 +++++++++------- .../apache/doris/mtmv/MTMVPartitionUtil.java | 4 +- .../mtmv/MTMVRefreshPartitionSnapshot.java | 20 ++++- .../doris/mtmv/MTMVRefreshSnapshot.java | 4 +- .../doris/mtmv/MTMVRelationManager.java | 6 +- .../org/apache/doris/mtmv/MTMVService.java | 15 +++- .../java/org/apache/doris/mtmv/MTMVUtil.java | 19 +++-- .../doris/mtmv/MTMVRelationManagerTest.java | 77 +++++++++++++++++-- 8 files changed, 155 insertions(+), 56 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index bc9a3fdd2050f1..8d5d42b08356bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -19,9 +19,7 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.datasource.InternalCatalog; import com.google.common.base.Objects; import com.google.gson.annotations.SerializedName; @@ -31,26 +29,28 @@ public class BaseTableInfo { private static final Logger LOG = LogManager.getLogger(BaseTableInfo.class); + // The MTMV needs to record the name to avoid changing the ID after rebuilding the same named base table, + // which may make the materialized view unusable. + // The previous version stored the ID, so it is temporarily kept for compatibility with the old version @SerializedName("ti") + @Deprecated private long tableId; @SerializedName("di") + @Deprecated private long dbId; @SerializedName("ci") + @Deprecated private long ctlId; - public BaseTableInfo(long tableId, long dbId) { - this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null"); - this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null"); - this.ctlId = InternalCatalog.INTERNAL_CATALOG_ID; - } - - public BaseTableInfo(long tableId, long dbId, long ctlId) { - this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null"); - this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null"); - this.ctlId = java.util.Objects.requireNonNull(ctlId, "ctlId is null"); - } + @SerializedName("tn") + private String tableName; + @SerializedName("dn") + private String dbName; + @SerializedName("cn") + private String ctlName; public BaseTableInfo(TableIf table) { + java.util.Objects.requireNonNull(table, "table is null"); DatabaseIf database = table.getDatabase(); java.util.Objects.requireNonNull(database, "database is null"); CatalogIf catalog = database.getCatalog(); @@ -58,16 +58,34 @@ public BaseTableInfo(TableIf table) { this.tableId = table.getId(); this.dbId = database.getId(); this.ctlId = catalog.getId(); + this.tableName = table.getName(); + this.dbName = database.getFullName(); + this.ctlName = catalog.getName(); + } + + public String getTableName() { + return tableName; + } + + public String getDbName() { + return dbName; + } + + public String getCtlName() { + return ctlName; } + @Deprecated public long getTableId() { return tableId; } + @Deprecated public long getDbId() { return dbId; } + @Deprecated public long getCtlId() { return ctlId; } @@ -81,31 +99,21 @@ public boolean equals(Object o) { return false; } BaseTableInfo that = (BaseTableInfo) o; - return Objects.equal(tableId, that.tableId) - && Objects.equal(dbId, that.dbId) - && Objects.equal(ctlId, that.ctlId); + return Objects.equal(tableName, that.tableName) && Objects.equal( + dbName, that.dbName) && Objects.equal(ctlName, that.ctlName); } @Override public int hashCode() { - return Objects.hashCode(tableId, dbId, ctlId); + return Objects.hashCode(tableName, dbName, ctlName); } @Override public String toString() { return "BaseTableInfo{" - + "tableId=" + tableId - + ", dbId=" + dbId - + ", ctlId=" + ctlId + + "tableName='" + tableName + '\'' + + ", dbName='" + dbName + '\'' + + ", ctlName='" + ctlName + '\'' + '}'; } - - public String getTableName() { - try { - return MTMVUtil.getTable(this).getName(); - } catch (AnalysisException e) { - LOG.warn("can not get table: " + this); - return ""; - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index b07ca6ad1d10bf..b88bdb80aef3ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -440,7 +440,7 @@ private static boolean isSyncWithBaseTable(MTMV mtmv, String mtmvPartitionName, } MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(); return mtmv.getRefreshSnapshot() - .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); + .equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot); } /** @@ -488,7 +488,7 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, if (!(table instanceof MTMVRelatedTableIf)) { continue; } - refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot()); + refreshPartitionSnapshot.addTableSnapshot(baseTableInfo, ((MTMVRelatedTableIf) table).getTableSnapshot()); } return refreshPartitionSnapshot; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java index 2336c3922ea4b1..ec62481c49e16f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java @@ -25,8 +25,12 @@ public class MTMVRefreshPartitionSnapshot { @SerializedName("p") private Map partitions; + // old version only persist table id, we need `BaseTableInfo`, `tables` only for compatible old version @SerializedName("t") + @Deprecated private Map tables; + @SerializedName("ti") + private Map tablesInfo; public MTMVRefreshPartitionSnapshot() { this.partitions = Maps.newConcurrentMap(); @@ -37,15 +41,25 @@ public Map getPartitions() { return partitions; } - public Map getTables() { - return tables; + public MTMVSnapshotIf getTableSnapshot(BaseTableInfo table) { + if (tablesInfo.containsKey(table)) { + return tablesInfo.get(table); + } + // for compatible old version + return tables.get(table.getTableId()); + } + + public void addTableSnapshot(BaseTableInfo baseTableInfo, MTMVSnapshotIf tableSnapshot) { + tablesInfo.put(baseTableInfo, tableSnapshot); + // for compatible old version + tables.put(baseTableInfo.getTableId(), tableSnapshot); } @Override public String toString() { return "MTMVRefreshPartitionSnapshot{" + "partitions=" + partitions - + ", tables=" + tables + + ", tablesInfo=" + tablesInfo + '}'; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java index d48911275e886b..31ebb4aeec0551 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java @@ -55,13 +55,13 @@ public Set getSnapshotPartitions(String mtmvPartitionName) { return partitionSnapshot.getPartitions().keySet(); } - public boolean equalsWithBaseTable(String mtmvPartitionName, long baseTableId, + public boolean equalsWithBaseTable(String mtmvPartitionName, BaseTableInfo tableInfo, MTMVSnapshotIf baseTableCurrentSnapshot) { MTMVRefreshPartitionSnapshot partitionSnapshot = partitionSnapshots.get(mtmvPartitionName); if (partitionSnapshot == null) { return false; } - MTMVSnapshotIf relatedPartitionSnapshot = partitionSnapshot.getTables().get(baseTableId); + MTMVSnapshotIf relatedPartitionSnapshot = partitionSnapshot.getTableSnapshot(tableInfo); if (relatedPartitionSnapshot == null) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java index b5f8bbbf663d26..ba8d85c6e4f593 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java @@ -187,7 +187,7 @@ public void dropMTMV(MTMV mtmv) throws DdlException { */ @Override public void registerMTMV(MTMV mtmv, Long dbId) { - refreshMTMVCache(mtmv.getRelation(), new BaseTableInfo(mtmv.getId(), dbId)); + refreshMTMVCache(mtmv.getRelation(), new BaseTableInfo(mtmv)); } /** @@ -269,9 +269,7 @@ private void processBaseTableChange(Table table, String msgPrefix) { for (BaseTableInfo mtmvInfo : mtmvsByBaseTable) { Table mtmv = null; try { - mtmv = Env.getCurrentEnv().getInternalCatalog() - .getDbOrAnalysisException(mtmvInfo.getDbId()) - .getTableOrAnalysisException(mtmvInfo.getTableId()); + mtmv = (Table) MTMVUtil.getTable(mtmvInfo); } catch (AnalysisException e) { LOG.warn(e); continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index d5d86b7eedab97..907697a0554896 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -17,8 +17,10 @@ package org.apache.doris.mtmv; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -177,12 +179,21 @@ public void processEvent(Event event) throws EventException { } TableEvent tableEvent = (TableEvent) event; LOG.info("processEvent, Event: {}", event); + TableIf table; + try { + table = Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(tableEvent.getCtlId()) + .getDbOrAnalysisException(tableEvent.getDbId()) + .getTableOrAnalysisException(tableEvent.getTableId()); + } catch (AnalysisException e) { + throw new EventException(e); + } Set mtmvs = relationManager.getMtmvsByBaseTableOneLevel( - new BaseTableInfo(tableEvent.getTableId(), tableEvent.getDbId(), tableEvent.getCtlId())); + new BaseTableInfo(table)); for (BaseTableInfo baseTableInfo : mtmvs) { try { // check if mtmv should trigger by event - MTMV mtmv = MTMVUtil.getMTMV(baseTableInfo.getDbId(), baseTableInfo.getTableId()); + MTMV mtmv = (MTMV) MTMVUtil.getTable(baseTableInfo); if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) { jobManager.onCommit(mtmv); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 4868ef94a1b570..b56b61f6d16038 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -37,6 +37,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import java.util.List; import java.util.Optional; @@ -52,11 +53,17 @@ public class MTMVUtil { * @throws AnalysisException */ public static TableIf getTable(BaseTableInfo baseTableInfo) throws AnalysisException { - TableIf table = Env.getCurrentEnv().getCatalogMgr() - .getCatalogOrAnalysisException(baseTableInfo.getCtlId()) - .getDbOrAnalysisException(baseTableInfo.getDbId()) - .getTableOrAnalysisException(baseTableInfo.getTableId()); - return table; + // for compatible old version, not have name + if (StringUtils.isEmpty(baseTableInfo.getCtlName())) { + return Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(baseTableInfo.getCtlId()) + .getDbOrAnalysisException(baseTableInfo.getDbId()) + .getTableOrAnalysisException(baseTableInfo.getTableId()); + } + return Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(baseTableInfo.getCtlName()) + .getDbOrAnalysisException(baseTableInfo.getDbName()) + .getTableOrAnalysisException(baseTableInfo.getTableName()); } public static MTMVRelatedTableIf getRelatedTable(BaseTableInfo baseTableInfo) { @@ -87,7 +94,7 @@ public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotF public static boolean mtmvContainsExternalTable(MTMV mtmv) { Set baseTables = mtmv.getRelation().getBaseTablesOneLevel(); for (BaseTableInfo baseTableInfo : baseTables) { - if (baseTableInfo.getCtlId() != InternalCatalog.INTERNAL_CATALOG_ID) { + if (!baseTableInfo.getCtlName().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) { return true; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelationManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelationManagerTest.java index 697643337c2391..40263705c43f99 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelationManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelationManagerTest.java @@ -17,22 +17,87 @@ package org.apache.doris.mtmv; +import org.apache.doris.common.AnalysisException; + import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mocked; import org.apache.commons.collections.CollectionUtils; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.util.Set; public class MTMVRelationManagerTest { + @Mocked + private BaseTableInfo mv1; + @Mocked + private BaseTableInfo mv2; + @Mocked + private BaseTableInfo t3; + @Mocked + private BaseTableInfo t4; + + @Before + public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException { + new Expectations() { + { + mv1.getCtlName(); + minTimes = 0; + result = "ctl1"; + + mv1.getDbName(); + minTimes = 0; + result = "db1"; + + mv1.getTableName(); + minTimes = 0; + result = "mv1"; + + mv2.getCtlName(); + minTimes = 0; + result = "ctl1"; + + mv2.getDbName(); + minTimes = 0; + result = "db1"; + + mv2.getTableName(); + minTimes = 0; + result = "mv2"; + + t3.getCtlName(); + minTimes = 0; + result = "ctl1"; + + t3.getDbName(); + minTimes = 0; + result = "db1"; + + t3.getTableName(); + minTimes = 0; + result = "t3"; + + t4.getCtlName(); + minTimes = 0; + result = "ctl1"; + + t4.getDbName(); + minTimes = 0; + result = "db1"; + + t4.getTableName(); + minTimes = 0; + result = "t4"; + } + }; + } + @Test public void testGetMtmvsByBaseTableOneLevel() { // mock mv2==>mv1,t3; mv1==>t4 MTMVRelationManager manager = new MTMVRelationManager(); - BaseTableInfo mv1 = new BaseTableInfo(0L, 1L); - BaseTableInfo mv2 = new BaseTableInfo(0L, 2L); - BaseTableInfo t3 = new BaseTableInfo(0L, 3L); - BaseTableInfo t4 = new BaseTableInfo(0L, 4L); MTMVRelation mv2Relation = new MTMVRelation(Sets.newHashSet(mv1, t3, t4), Sets.newHashSet(mv1, t3), Sets.newHashSet()); MTMVRelation mv1Relation = new MTMVRelation(Sets.newHashSet(t4), Sets.newHashSet(t4), @@ -68,10 +133,6 @@ public void testGetMtmvsByBaseTableOneLevel() { public void testGetMtmvsByBaseTable() { // mock mv2==>mv1,t3; mv1==>t4 MTMVRelationManager manager = new MTMVRelationManager(); - BaseTableInfo mv1 = new BaseTableInfo(0L, 1L); - BaseTableInfo mv2 = new BaseTableInfo(0L, 2L); - BaseTableInfo t3 = new BaseTableInfo(0L, 3L); - BaseTableInfo t4 = new BaseTableInfo(0L, 4L); MTMVRelation mv2Relation = new MTMVRelation(Sets.newHashSet(mv1, t3, t4), Sets.newHashSet(mv1, t3), Sets.newHashSet()); MTMVRelation mv1Relation = new MTMVRelation(Sets.newHashSet(t4), Sets.newHashSet(t4), From ab26d61bf658122904331f68aadb1dcee3e3473a Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Wed, 14 Aug 2024 18:54:09 +0800 Subject: [PATCH 02/11] 1 --- .../mtmv/MTMVRefreshPartitionSnapshot.java | 1 + .../java/org/apache/doris/mtmv/MTMVUtil.java | 2 +- .../doris/mtmv/MTMVPartitionUtilTest.java | 4 +- .../doris/mtmv/MTMVRefreshSnapshotTest.java | 54 ++++++++++++++++--- 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java index ec62481c49e16f..fb8f79a59c7c2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java @@ -35,6 +35,7 @@ public class MTMVRefreshPartitionSnapshot { public MTMVRefreshPartitionSnapshot() { this.partitions = Maps.newConcurrentMap(); this.tables = Maps.newConcurrentMap(); + this.tablesInfo = Maps.newConcurrentMap(); } public Map getPartitions() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index b56b61f6d16038..71c3a5c8b689cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -60,7 +60,7 @@ public static TableIf getTable(BaseTableInfo baseTableInfo) throws AnalysisExcep .getDbOrAnalysisException(baseTableInfo.getDbId()) .getTableOrAnalysisException(baseTableInfo.getTableId()); } - return Env.getCurrentEnv().getCatalogMgr() + return Env.getCurrentEnv().getCatalogMgr() .getCatalogOrAnalysisException(baseTableInfo.getCtlName()) .getDbOrAnalysisException(baseTableInfo.getDbName()) .getTableOrAnalysisException(baseTableInfo.getTableName()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index 63a75c724988ed..997385742dc09a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -120,7 +120,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = refreshSnapshot; - refreshSnapshot.equalsWithBaseTable(anyString, anyLong, (MTMVSnapshotIf) any); + refreshSnapshot.equalsWithBaseTable(anyString, (BaseTableInfo) any, (MTMVSnapshotIf) any); minTimes = 0; result = true; @@ -157,7 +157,7 @@ public void testIsMTMVSyncNormal() { public void testIsMTMVSyncNotSync() { new Expectations() { { - refreshSnapshot.equalsWithBaseTable(anyString, anyLong, (MTMVSnapshotIf) any); + refreshSnapshot.equalsWithBaseTable(anyString, (BaseTableInfo) any, (MTMVSnapshotIf) any); minTimes = 0; result = false; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java index 42b5b7838419ed..1890f9c9805926 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java @@ -21,6 +21,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mocked; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -35,14 +37,54 @@ public class MTMVRefreshSnapshotTest { private MTMVRefreshSnapshot refreshSnapshot = new MTMVRefreshSnapshot(); private MTMVVersionSnapshot p1Snapshot = new MTMVVersionSnapshot(correctVersion); private MTMVVersionSnapshot t1Snapshot = new MTMVVersionSnapshot(correctVersion); + @Mocked + private BaseTableInfo existTable; + @Mocked + private BaseTableInfo nonExistTable; @Before public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException { + new Expectations() { + { + existTable.getCtlName(); + minTimes = 0; + result = "ctl1"; + + existTable.getDbName(); + minTimes = 0; + result = "db1"; + + existTable.getTableName(); + minTimes = 0; + result = "t1"; + + existTable.getTableId(); + minTimes = 0; + result = 1L; + + nonExistTable.getCtlName(); + minTimes = 0; + result = "ctl1"; + + nonExistTable.getDbName(); + minTimes = 0; + result = "db1"; + + nonExistTable.getTableName(); + minTimes = 0; + result = "t2"; + + nonExistTable.getTableId(); + minTimes = 0; + result = 2L; + } + }; + Map partitionSnapshots = Maps.newHashMap(); MTMVRefreshPartitionSnapshot mvp1PartitionSnapshot = new MTMVRefreshPartitionSnapshot(); partitionSnapshots.put(mvExistPartitionName, mvp1PartitionSnapshot); mvp1PartitionSnapshot.getPartitions().put(relatedExistPartitionName, p1Snapshot); - mvp1PartitionSnapshot.getTables().put(baseExistTableId, t1Snapshot); + mvp1PartitionSnapshot.addTableSnapshot(existTable, t1Snapshot); refreshSnapshot.updateSnapshots(partitionSnapshots, Sets.newHashSet(mvExistPartitionName)); } @@ -73,23 +115,23 @@ public void testPartitionSync() { @Test public void testTableSync() { // normal - boolean sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId, + boolean sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, existTable, new MTMVVersionSnapshot(correctVersion)); Assert.assertTrue(sync); // non exist mv partition sync = refreshSnapshot - .equalsWithBaseTable("mvp2", baseExistTableId, new MTMVVersionSnapshot(correctVersion)); + .equalsWithBaseTable("mvp2", existTable, new MTMVVersionSnapshot(correctVersion)); Assert.assertFalse(sync); // non exist related partition sync = refreshSnapshot - .equalsWithBaseTable(mvExistPartitionName, 2L, new MTMVVersionSnapshot(correctVersion)); + .equalsWithBaseTable(mvExistPartitionName, nonExistTable, new MTMVVersionSnapshot(correctVersion)); Assert.assertFalse(sync); // snapshot value not equal sync = refreshSnapshot - .equalsWithBaseTable(mvExistPartitionName, baseExistTableId, new MTMVVersionSnapshot(2L)); + .equalsWithBaseTable(mvExistPartitionName, existTable, new MTMVVersionSnapshot(2L)); Assert.assertFalse(sync); // snapshot type not equal - sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId, + sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, existTable, new MTMVTimestampSnapshot(correctVersion)); Assert.assertFalse(sync); } From a2c0fcf481eb45981f376fd51199f8b929c0e82f Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Wed, 14 Aug 2024 19:05:40 +0800 Subject: [PATCH 03/11] 1 --- .../suites/mtmv_p0/test_hive_mtmv.groovy | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy index 872d7bf8ec0369..4ac5ad9e890463 100644 --- a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy @@ -68,8 +68,24 @@ suite("test_hive_mtmv", "p0,external,hive,external_docker,external_docker_hive") waitingMTMVTaskFinished(jobName) order_qt_refresh_complete "SELECT * FROM ${mvName} order by id" - sql """drop materialized view if exists ${mvName};""" + order_qt_is_sync_before_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + // rebuild catalog, should not Affects MTMV + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + + order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + // should refresh normal after catalog rebuild + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} order by id" + + sql """drop materialized view if exists ${mvName};""" sql """drop catalog if exists ${catalog_name}""" } finally { } From 513a268fabeb913aff3b9055a595fff1c13cef79 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Wed, 14 Aug 2024 19:22:39 +0800 Subject: [PATCH 04/11] 1 --- .../java/org/apache/doris/mtmv/BaseTableInfo.java | 13 +++++++++++++ .../org/apache/doris/mtmv/MTMVRelationManager.java | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index 8d5d42b08356bf..5188e463638d81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -18,8 +18,10 @@ package org.apache.doris.mtmv; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; import com.google.common.base.Objects; import com.google.gson.annotations.SerializedName; @@ -63,6 +65,17 @@ public BaseTableInfo(TableIf table) { this.ctlName = catalog.getName(); } + // for replay MTMV, can not use `table.getDatabase();`,because database not added to catalog + public BaseTableInfo(OlapTable table, long dbId) { + java.util.Objects.requireNonNull(table, "table is null"); + this.tableId = table.getId(); + this.dbId = dbId; + this.ctlId = InternalCatalog.INTERNAL_CATALOG_ID; + this.tableName = table.getName(); + this.dbName = table.getDBName(); + this.ctlName = InternalCatalog.INTERNAL_CATALOG_NAME; + } + public String getTableName() { return tableName; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java index ba8d85c6e4f593..8484464d2fd5db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java @@ -187,7 +187,7 @@ public void dropMTMV(MTMV mtmv) throws DdlException { */ @Override public void registerMTMV(MTMV mtmv, Long dbId) { - refreshMTMVCache(mtmv.getRelation(), new BaseTableInfo(mtmv)); + refreshMTMVCache(mtmv.getRelation(), new BaseTableInfo(mtmv, dbId)); } /** From 82cab60f2c7a1b57572479af19260f3f5ce09331 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Wed, 14 Aug 2024 19:27:49 +0800 Subject: [PATCH 05/11] 1 --- .../data/mtmv_p0/test_hive_mtmv.out | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/regression-test/data/mtmv_p0/test_hive_mtmv.out b/regression-test/data/mtmv_p0/test_hive_mtmv.out index 50c8016c87d087..1176f9320ce00f 100644 --- a/regression-test/data/mtmv_p0/test_hive_mtmv.out +++ b/regression-test/data/mtmv_p0/test_hive_mtmv.out @@ -4,12 +4,21 @@ 2 B 20230101 3 C 20230101 --- !refresh_other_partition -- +-- !refresh_complete -- 1 A 20230101 2 B 20230101 3 C 20230101 +4 D 20230102 +5 E 20230102 +6 F 20230102 --- !refresh_complete -- +-- !is_sync_before_rebuild -- +true + +-- !is_sync_after_rebuild -- +true + +-- !refresh_complete_rebuild -- 1 A 20230101 2 B 20230101 3 C 20230101 @@ -22,12 +31,21 @@ 2 B 20230101 3 C 20230101 --- !refresh_other_partition -- +-- !refresh_complete -- 1 A 20230101 2 B 20230101 3 C 20230101 +4 D 20230102 +5 E 20230102 +6 F 20230102 --- !refresh_complete -- +-- !is_sync_before_rebuild -- +true + +-- !is_sync_after_rebuild -- +true + +-- !refresh_complete_rebuild -- 1 A 20230101 2 B 20230101 3 C 20230101 From 473d438578e25f003ac0e3f3d38b6bf98be08a75 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Thu, 15 Aug 2024 11:37:13 +0800 Subject: [PATCH 06/11] 1 --- .../src/main/java/org/apache/doris/alter/Alter.java | 3 ++- .../java/org/apache/doris/mtmv/BaseTableInfo.java | 4 ++++ .../java/org/apache/doris/mtmv/MTMVHookService.java | 2 +- .../java/org/apache/doris/mtmv/MTMVJobManager.java | 2 +- .../org/apache/doris/mtmv/MTMVRelationManager.java | 11 ++++++----- .../main/java/org/apache/doris/mtmv/MTMVService.java | 4 ++-- 6 files changed, 16 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 1fcb4fe65c38a3..2fbfa98f7ef74e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -170,6 +170,7 @@ private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable, olapTable.checkNormalStateForAlter(); boolean needProcessOutsideTableLock = false; + String oldTableName = olapTable.getName(); if (currentAlterOps.checkTableStoragePolicy(alterClauses)) { String tableStoragePolicy = olapTable.getStoragePolicy(); String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses); @@ -282,7 +283,7 @@ private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable, throw new DdlException("Invalid alter operations: " + currentAlterOps); } if (needChangeMTMVState(alterClauses)) { - Env.getCurrentEnv().getMtmvService().alterTable(olapTable); + Env.getCurrentEnv().getMtmvService().alterTable(olapTable, oldTableName); } return needProcessOutsideTableLock; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index 5188e463638d81..ba906f248aa5c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -103,6 +103,10 @@ public long getCtlId() { return ctlId; } + public void setTableName(String tableName) { + this.tableName = tableName; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java index d9ab9984581227..e0edd06f8c9418 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java @@ -104,7 +104,7 @@ public interface MTMVHookService { * * @param table */ - void alterTable(Table table); + void alterTable(Table table, String oldTableName); /** * Triggered when pause mtmv diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index bed44e8d37d136..11089899b309a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -188,7 +188,7 @@ public void dropTable(Table table) { } @Override - public void alterTable(Table table) { + public void alterTable(Table table, String oldTableName) { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java index 8484464d2fd5db..436427526ba08b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java @@ -232,7 +232,7 @@ public void refreshComplete(MTMV mtmv, MTMVRelation relation, MTMVTask task) { */ @Override public void dropTable(Table table) { - processBaseTableChange(table, "The base table has been deleted:"); + processBaseTableChange(new BaseTableInfo(table), "The base table has been deleted:"); } /** @@ -241,8 +241,10 @@ public void dropTable(Table table) { * @param table */ @Override - public void alterTable(Table table) { - processBaseTableChange(table, "The base table has been updated:"); + public void alterTable(Table table, String oldTableName) { + BaseTableInfo baseTableInfo = new BaseTableInfo(table); + baseTableInfo.setTableName(oldTableName); + processBaseTableChange(baseTableInfo, "The base table has been updated:"); } @Override @@ -260,8 +262,7 @@ public void cancelMTMVTask(CancelMTMVTaskInfo info) { } - private void processBaseTableChange(Table table, String msgPrefix) { - BaseTableInfo baseTableInfo = new BaseTableInfo(table); + private void processBaseTableChange(BaseTableInfo baseTableInfo, String msgPrefix) { Set mtmvsByBaseTable = getMtmvsByBaseTable(baseTableInfo); if (CollectionUtils.isEmpty(mtmvsByBaseTable)) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index 907697a0554896..865f9f35a66920 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -130,11 +130,11 @@ public void dropTable(Table table) { } } - public void alterTable(Table table) { + public void alterTable(Table table, String oldTableName) { Objects.requireNonNull(table); LOG.info("alterTable, tableName: {}", table.getName()); for (MTMVHookService mtmvHookService : hooks.values()) { - mtmvHookService.alterTable(table); + mtmvHookService.alterTable(table, oldTableName); } } From 814e1be78d0ed1fa95c93d72b0464cd1981ba653 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Thu, 15 Aug 2024 11:47:16 +0800 Subject: [PATCH 07/11] 1 --- .../trees/plans/commands/info/AlterMTMVRenameInfo.java | 2 +- .../trees/plans/commands/info/AlterMTMVReplaceInfo.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVRenameInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVRenameInfo.java index c86626b5920cca..066342c3b2c706 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVRenameInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVRenameInfo.java @@ -61,6 +61,6 @@ public void run() throws DdlException { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb()); Table table = db.getTableOrDdlException(mvName.getTbl()); Env.getCurrentEnv().renameTable(db, table, newName); - Env.getCurrentEnv().getMtmvService().alterTable(table); + Env.getCurrentEnv().getMtmvService().alterTable(table, mvName.getTbl()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVReplaceInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVReplaceInfo.java index 440db1e1400cdc..6dd0907db62063 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVReplaceInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVReplaceInfo.java @@ -90,9 +90,9 @@ public void run() throws UserException { MTMV mtmv = (MTMV) db.getTableOrDdlException(mvName.getTbl(), TableType.MATERIALIZED_VIEW); MTMV newMtmv = (MTMV) db.getTableOrDdlException(newName, TableType.MATERIALIZED_VIEW); Env.getCurrentEnv().getAlterInstance().processReplaceTable(db, mtmv, newName, swapTable); - Env.getCurrentEnv().getMtmvService().alterTable(newMtmv); + Env.getCurrentEnv().getMtmvService().alterTable(newMtmv, mvName.getTbl()); if (swapTable) { - Env.getCurrentEnv().getMtmvService().alterTable(mtmv); + Env.getCurrentEnv().getMtmvService().alterTable(mtmv, newName); } else { Env.getCurrentEnv().getMtmvService().dropMTMV(mtmv); Env.getCurrentEnv().getMtmvService().dropTable(mtmv); From 6f6917f6507a60cae761d43753dfa231e4112968 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Thu, 15 Aug 2024 12:58:36 +0800 Subject: [PATCH 08/11] 1 --- .../java/org/apache/doris/catalog/Env.java | 9 +++++ .../java/org/apache/doris/catalog/MTMV.java | 21 ++++++++-- .../org/apache/doris/mtmv/BaseTableInfo.java | 19 ++++++++++ .../apache/doris/mtmv/MTMVPartitionInfo.java | 8 ++++ .../mtmv/MTMVRefreshPartitionSnapshot.java | 38 +++++++++++++++++++ .../doris/mtmv/MTMVRefreshSnapshot.java | 11 ++++++ .../org/apache/doris/mtmv/MTMVRelation.java | 18 +++++++++ .../java/org/apache/doris/mtmv/MTMVUtil.java | 13 +++++++ 8 files changed, 134 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 907f25739208a0..98f865601ee340 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -189,6 +189,7 @@ import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVService; import org.apache.doris.mtmv.MTMVStatus; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.authenticate.AuthenticateType; import org.apache.doris.mysql.authenticate.AuthenticatorManager; import org.apache.doris.mysql.privilege.AccessControllerManager; @@ -1715,6 +1716,14 @@ public boolean postProcessAfterMetadataReplayed(boolean waitCatalogReady) { auth.rectifyPrivs(); catalogMgr.registerCatalogRefreshListener(this); + // MTMV needs to be compatible with old metadata, and during the compatibility process, + // it needs to wait for all catalog data to be ready, so it cannot be processed through gsonPostProcess() + // We catch all possible exceptions to avoid FE startup failure + try { + MTMVUtil.compatibleMTMV(catalogMgr); + } catch (Throwable t) { + LOG.warn("compatibleMTMV failed", t); + } return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index cd7583193e8c2b..1611951e64792c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -25,6 +25,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.mtmv.MTMVCache; @@ -47,9 +48,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -//import com.google.gson.JsonElement; -//import com.google.gson.JsonObject; -//import com.google.gson.JsonParser; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -489,4 +487,21 @@ public String toInfoString() { sb.append('}'); return sb.toString(); } + + /** + * Previously, ID was used to store the related table of materialized views, + * but when the catalog is deleted, the ID will change, so name is used instead. + * The logic here is to be compatible with older versions by converting ID to name + */ + public void compatible(CatalogMgr catalogMgr) { + if (mvPartitionInfo != null) { + mvPartitionInfo.compatible(catalogMgr); + } + if (relation != null) { + relation.compatible(catalogMgr); + } + if (refreshSnapshot != null) { + refreshSnapshot.compatible(this); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index ba906f248aa5c2..ba35487cfa6e3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -20,11 +20,14 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.InternalCatalog; import com.google.common.base.Objects; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -133,4 +136,20 @@ public String toString() { + ", ctlName='" + ctlName + '\'' + '}'; } + + public void compatible(CatalogMgr catalogMgr) { + if (!StringUtils.isEmpty(ctlName)) { + return; + } + try { + CatalogIf catalog = catalogMgr.getCatalogOrAnalysisException(ctlId); + DatabaseIf db = catalog.getDbOrAnalysisException(dbId); + TableIf table = db.getTableOrAnalysisException(tableId); + this.ctlName = catalog.getName(); + this.dbName = db.getFullName(); + this.tableName = table.getName(); + } catch (AnalysisException e) { + LOG.info("MTMV compatible failed, ctlId: {}, dbId: {}, tableId: {}", ctlId, dbId, tableId, e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java index ff4060f334a952..b3cd239269abc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.catalog.Column; import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.CatalogMgr; import com.google.gson.annotations.SerializedName; @@ -149,4 +150,11 @@ public String toNameString() { + '}'; } } + + public void compatible(CatalogMgr catalogMgr) { + if (relatedTable == null) { + return; + } + relatedTable.compatible(catalogMgr); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java index fb8f79a59c7c2c..8a4f8e64a4b999 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java @@ -17,12 +17,21 @@ package org.apache.doris.mtmv; +import org.apache.doris.catalog.MTMV; + import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; public class MTMVRefreshPartitionSnapshot { + private static final Logger LOG = LogManager.getLogger(MTMV.class); @SerializedName("p") private Map partitions; // old version only persist table id, we need `BaseTableInfo`, `tables` only for compatible old version @@ -63,4 +72,33 @@ public String toString() { + ", tablesInfo=" + tablesInfo + '}'; } + + public void compatible(MTMV mtmv) { + if (tables.size() == tablesInfo.size()) { + return; + } + MTMVRelation relation = mtmv.getRelation(); + if (relation == null || CollectionUtils.isEmpty(relation.getBaseTablesOneLevel())) { + return; + } + for (Entry entry : tables.entrySet()) { + Optional tableInfo = getByTableId(entry.getKey(), + relation.getBaseTablesOneLevel()); + if (tableInfo.isPresent()) { + tablesInfo.put(tableInfo.get(), entry.getValue()); + } else { + LOG.info("MTMV compatible failed, tableId: {}, relationTables: {}", entry.getKey(), + relation.getBaseTablesOneLevel()); + } + } + } + + private Optional getByTableId(Long tableId, Set baseTables) { + for (BaseTableInfo info : baseTables) { + if (info.getTableId() == tableId) { + return Optional.of(info); + } + } + return Optional.empty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java index 31ebb4aeec0551..74fc3cc1c5cfb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java @@ -17,6 +17,8 @@ package org.apache.doris.mtmv; +import org.apache.doris.catalog.MTMV; + import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; @@ -88,4 +90,13 @@ public String toString() { + "partitionSnapshots=" + partitionSnapshots + '}'; } + + public void compatible(MTMV mtmv) { + if (MapUtils.isEmpty(partitionSnapshots)) { + return; + } + for (MTMVRefreshPartitionSnapshot snapshot : partitionSnapshots.values()) { + snapshot.compatible(mtmv); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelation.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelation.java index aec89caa508423..87a0199f128f88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelation.java @@ -17,7 +17,10 @@ package org.apache.doris.mtmv; +import org.apache.doris.datasource.CatalogMgr; + import com.google.gson.annotations.SerializedName; +import org.apache.commons.collections.CollectionUtils; import java.util.Set; @@ -61,4 +64,19 @@ public String toInfoString() { + ", baseViews=" + baseViews + '}'; } + + public void compatible(CatalogMgr catalogMgr) { + compatible(catalogMgr, baseTables); + compatible(catalogMgr, baseViews); + compatible(catalogMgr, baseTablesOneLevel); + } + + private void compatible(CatalogMgr catalogMgr, Set infos) { + if (CollectionUtils.isEmpty(infos)) { + return; + } + for (BaseTableInfo baseTableInfo : infos) { + baseTableInfo.compatible(catalogMgr); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 71c3a5c8b689cb..899904efc8896c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -27,6 +27,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform; @@ -158,4 +159,16 @@ public static void checkModifyMTMVData(Database db, List tableIdList, Conn } } } + + public static void compatibleMTMV(CatalogMgr catalogMgr) { + List dbs = catalogMgr.getInternalCatalog().getDbs(); + for (Database database : dbs) { + List tables = database.getTables(); + for (Table table : tables) { + if (table instanceof MTMV) { + ((MTMV) table).compatible(catalogMgr); + } + } + } + } } From 18a34f477b2358a9c02e843fe1a92a81de6ec742 Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Fri, 16 Aug 2024 15:11:11 +0800 Subject: [PATCH 09/11] 1 --- .../main/java/org/apache/doris/mtmv/BaseTableInfo.java | 2 +- .../apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java | 2 +- .../src/main/java/org/apache/doris/mtmv/MTMVUtil.java | 9 +++++---- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index ba35487cfa6e3d..a53646e48ba330 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -149,7 +149,7 @@ public void compatible(CatalogMgr catalogMgr) { this.dbName = db.getFullName(); this.tableName = table.getName(); } catch (AnalysisException e) { - LOG.info("MTMV compatible failed, ctlId: {}, dbId: {}, tableId: {}", ctlId, dbId, tableId, e); + LOG.warn("MTMV compatible failed, ctlId: {}, dbId: {}, tableId: {}", ctlId, dbId, tableId, e); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java index 8a4f8e64a4b999..63bbfc2e037084 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java @@ -87,7 +87,7 @@ public void compatible(MTMV mtmv) { if (tableInfo.isPresent()) { tablesInfo.put(tableInfo.get(), entry.getValue()); } else { - LOG.info("MTMV compatible failed, tableId: {}, relationTables: {}", entry.getKey(), + LOG.warn("MTMV compatible failed, tableId: {}, relationTables: {}", entry.getKey(), relation.getBaseTablesOneLevel()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 899904efc8896c..e84136489291f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -60,11 +60,12 @@ public static TableIf getTable(BaseTableInfo baseTableInfo) throws AnalysisExcep .getCatalogOrAnalysisException(baseTableInfo.getCtlId()) .getDbOrAnalysisException(baseTableInfo.getDbId()) .getTableOrAnalysisException(baseTableInfo.getTableId()); + } else { + return Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(baseTableInfo.getCtlName()) + .getDbOrAnalysisException(baseTableInfo.getDbName()) + .getTableOrAnalysisException(baseTableInfo.getTableName()); } - return Env.getCurrentEnv().getCatalogMgr() - .getCatalogOrAnalysisException(baseTableInfo.getCtlName()) - .getDbOrAnalysisException(baseTableInfo.getDbName()) - .getTableOrAnalysisException(baseTableInfo.getTableName()); } public static MTMVRelatedTableIf getRelatedTable(BaseTableInfo baseTableInfo) { From a6a84ab9247fef0714320ed1d604b2ff3973b8ca Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 19 Aug 2024 14:49:48 +0800 Subject: [PATCH 10/11] event add name,and mv use name --- .../apache/doris/event/DataChangeEvent.java | 4 ++- .../doris/event/DropPartitionEvent.java | 4 ++- .../doris/event/ReplacePartitionEvent.java | 4 ++- .../org/apache/doris/event/TableEvent.java | 32 ++++++++++++++++++- .../org/apache/doris/mtmv/MTMVService.java | 6 ++-- .../transaction/DatabaseTransactionMgr.java | 2 +- 6 files changed, 44 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java index d58e62bfddeb0e..1e1a265d704348 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java @@ -17,8 +17,10 @@ package org.apache.doris.event; +import org.apache.doris.common.AnalysisException; + public class DataChangeEvent extends TableEvent { - public DataChangeEvent(long ctlId, long dbId, long tableId) { + public DataChangeEvent(long ctlId, long dbId, long tableId) throws AnalysisException { super(EventType.DATA_CHANGE, ctlId, dbId, tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java index 67339ebd05ab55..598768aa8de724 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java @@ -17,8 +17,10 @@ package org.apache.doris.event; +import org.apache.doris.common.AnalysisException; + public class DropPartitionEvent extends TableEvent { - public DropPartitionEvent(long ctlId, long dbId, long tableId) { + public DropPartitionEvent(long ctlId, long dbId, long tableId) throws AnalysisException { super(EventType.DROP_PARTITION, ctlId, dbId, tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java index 371d5cd553c6e3..170388dc0625d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java @@ -17,8 +17,10 @@ package org.apache.doris.event; +import org.apache.doris.common.AnalysisException; + public class ReplacePartitionEvent extends TableEvent { - public ReplacePartitionEvent(long ctlId, long dbId, long tableId) { + public ReplacePartitionEvent(long ctlId, long dbId, long tableId) throws AnalysisException { super(EventType.REPLACE_PARTITION, ctlId, dbId, tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java index 210ad2df40f403..6252e8447c3e5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java @@ -17,16 +17,31 @@ package org.apache.doris.event; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.CatalogIf; + public abstract class TableEvent extends Event { protected final long ctlId; + protected final String ctlName; protected final long dbId; + protected final String dbName; protected final long tableId; + protected final String tableName; - public TableEvent(EventType eventType, long ctlId, long dbId, long tableId) { + public TableEvent(EventType eventType, long ctlId, long dbId, long tableId) throws AnalysisException { super(eventType); this.ctlId = ctlId; this.dbId = dbId; this.tableId = tableId; + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogOrAnalysisException(ctlId); + DatabaseIf db = catalog.getDbOrAnalysisException(dbId); + TableIf table = db.getTableOrAnalysisException(tableId); + this.ctlName = catalog.getName(); + this.dbName = db.getFullName(); + this.tableName = table.getName(); } public long getCtlId() { @@ -41,12 +56,27 @@ public long getTableId() { return tableId; } + public String getCtlName() { + return ctlName; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + @Override public String toString() { return "TableEvent{" + "ctlId=" + ctlId + + ", ctlName='" + ctlName + '\'' + ", dbId=" + dbId + + ", dbName='" + dbName + '\'' + ", tableId=" + tableId + + ", tableName='" + tableName + '\'' + "} " + super.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index 865f9f35a66920..4b740b75ef8ce7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -182,9 +182,9 @@ public void processEvent(Event event) throws EventException { TableIf table; try { table = Env.getCurrentEnv().getCatalogMgr() - .getCatalogOrAnalysisException(tableEvent.getCtlId()) - .getDbOrAnalysisException(tableEvent.getDbId()) - .getTableOrAnalysisException(tableEvent.getTableId()); + .getCatalogOrAnalysisException(tableEvent.getCtlName()) + .getDbOrAnalysisException(tableEvent.getDbName()) + .getTableOrAnalysisException(tableEvent.getTableName()); } catch (AnalysisException e) { throw new EventException(e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 6b338d7f827f43..fa44bc30959692 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1199,7 +1199,7 @@ private void setTableVersion(TransactionState transactionState, Database db) { } } - private void produceEvent(TransactionState transactionState, Database db) { + private void produceEvent(TransactionState transactionState, Database db) throws AnalysisException { Collection tableCommitInfos; if (!transactionState.getSubTxnIdToTableCommitInfo().isEmpty()) { tableCommitInfos = transactionState.getSubTxnTableCommitInfos(); From c5b317afdae2af2d37d5012d287cf853582799af Mon Sep 17 00:00:00 2001 From: zhangdong <493738387@qq.com> Date: Mon, 19 Aug 2024 15:00:55 +0800 Subject: [PATCH 11/11] equals check ctlName --- .../main/java/org/apache/doris/mtmv/BaseTableInfo.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index a53646e48ba330..48796c3360773c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -119,8 +119,14 @@ public boolean equals(Object o) { return false; } BaseTableInfo that = (BaseTableInfo) o; - return Objects.equal(tableName, that.tableName) && Objects.equal( - dbName, that.dbName) && Objects.equal(ctlName, that.ctlName); + // for compatibility + if (StringUtils.isEmpty(ctlName) || StringUtils.isEmpty(that.ctlName)) { + return Objects.equal(tableId, that.tableId) && Objects.equal( + dbId, that.dbId) && Objects.equal(ctlId, that.ctlId); + } else { + return Objects.equal(tableName, that.tableName) && Objects.equal( + dbName, that.dbName) && Objects.equal(ctlName, that.ctlName); + } } @Override