diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index ced14ad4b8dc8a..f1879798a93ced 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -171,6 +171,7 @@ private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable, olapTable.checkNormalStateForAlter(); boolean needProcessOutsideTableLock = false; + String oldTableName = olapTable.getName(); if (currentAlterOps.checkTableStoragePolicy(alterClauses)) { String tableStoragePolicy = olapTable.getStoragePolicy(); String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses); @@ -284,7 +285,7 @@ private boolean processAlterOlapTable(AlterTableStmt stmt, OlapTable olapTable, throw new DdlException("Invalid alter operations: " + currentAlterOps); } if (needChangeMTMVState(alterClauses)) { - Env.getCurrentEnv().getMtmvService().alterTable(olapTable); + Env.getCurrentEnv().getMtmvService().alterTable(olapTable, oldTableName); } return needProcessOutsideTableLock; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 121228f569af23..0d456406e8c19e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -190,6 +190,7 @@ import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVService; import org.apache.doris.mtmv.MTMVStatus; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.authenticate.AuthenticateType; import org.apache.doris.mysql.authenticate.AuthenticatorManager; import org.apache.doris.mysql.privilege.AccessControllerManager; @@ -1716,6 +1717,14 @@ public boolean postProcessAfterMetadataReplayed(boolean waitCatalogReady) { auth.rectifyPrivs(); catalogMgr.registerCatalogRefreshListener(this); + // MTMV needs to be compatible with old metadata, and during the compatibility process, + // it needs to wait for all catalog data to be ready, so it cannot be processed through gsonPostProcess() + // We catch all possible exceptions to avoid FE startup failure + try { + MTMVUtil.compatibleMTMV(catalogMgr); + } catch (Throwable t) { + LOG.warn("compatibleMTMV failed", t); + } return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index cd7583193e8c2b..1611951e64792c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -25,6 +25,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.mtmv.MTMVCache; @@ -47,9 +48,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -//import com.google.gson.JsonElement; -//import com.google.gson.JsonObject; -//import com.google.gson.JsonParser; import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -489,4 +487,21 @@ public String toInfoString() { sb.append('}'); return sb.toString(); } + + /** + * Previously, ID was used to store the related table of materialized views, + * but when the catalog is deleted, the ID will change, so name is used instead. + * The logic here is to be compatible with older versions by converting ID to name + */ + public void compatible(CatalogMgr catalogMgr) { + if (mvPartitionInfo != null) { + mvPartitionInfo.compatible(catalogMgr); + } + if (relation != null) { + relation.compatible(catalogMgr); + } + if (refreshSnapshot != null) { + refreshSnapshot.compatible(this); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java index d58e62bfddeb0e..1e1a265d704348 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java @@ -17,8 +17,10 @@ package org.apache.doris.event; +import org.apache.doris.common.AnalysisException; + public class DataChangeEvent extends TableEvent { - public DataChangeEvent(long ctlId, long dbId, long tableId) { + public DataChangeEvent(long ctlId, long dbId, long tableId) throws AnalysisException { super(EventType.DATA_CHANGE, ctlId, dbId, tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java index 67339ebd05ab55..598768aa8de724 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java @@ -17,8 +17,10 @@ package org.apache.doris.event; +import org.apache.doris.common.AnalysisException; + public class DropPartitionEvent extends TableEvent { - public DropPartitionEvent(long ctlId, long dbId, long tableId) { + public DropPartitionEvent(long ctlId, long dbId, long tableId) throws AnalysisException { super(EventType.DROP_PARTITION, ctlId, dbId, tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java index 371d5cd553c6e3..170388dc0625d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java @@ -17,8 +17,10 @@ package org.apache.doris.event; +import org.apache.doris.common.AnalysisException; + public class ReplacePartitionEvent extends TableEvent { - public ReplacePartitionEvent(long ctlId, long dbId, long tableId) { + public ReplacePartitionEvent(long ctlId, long dbId, long tableId) throws AnalysisException { super(EventType.REPLACE_PARTITION, ctlId, dbId, tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java index 210ad2df40f403..6252e8447c3e5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java @@ -17,16 +17,31 @@ package org.apache.doris.event; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.CatalogIf; + public abstract class TableEvent extends Event { protected final long ctlId; + protected final String ctlName; protected final long dbId; + protected final String dbName; protected final long tableId; + protected final String tableName; - public TableEvent(EventType eventType, long ctlId, long dbId, long tableId) { + public TableEvent(EventType eventType, long ctlId, long dbId, long tableId) throws AnalysisException { super(eventType); this.ctlId = ctlId; this.dbId = dbId; this.tableId = tableId; + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalogOrAnalysisException(ctlId); + DatabaseIf db = catalog.getDbOrAnalysisException(dbId); + TableIf table = db.getTableOrAnalysisException(tableId); + this.ctlName = catalog.getName(); + this.dbName = db.getFullName(); + this.tableName = table.getName(); } public long getCtlId() { @@ -41,12 +56,27 @@ public long getTableId() { return tableId; } + public String getCtlName() { + return ctlName; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + @Override public String toString() { return "TableEvent{" + "ctlId=" + ctlId + + ", ctlName='" + ctlName + '\'' + ", dbId=" + dbId + + ", dbName='" + dbName + '\'' + ", tableId=" + tableId + + ", tableName='" + tableName + '\'' + "} " + super.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index bc9a3fdd2050f1..48796c3360773c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -18,39 +18,44 @@ package org.apache.doris.mtmv; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.InternalCatalog; import com.google.common.base.Objects; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class BaseTableInfo { private static final Logger LOG = LogManager.getLogger(BaseTableInfo.class); + // The MTMV needs to record the name to avoid changing the ID after rebuilding the same named base table, + // which may make the materialized view unusable. + // The previous version stored the ID, so it is temporarily kept for compatibility with the old version @SerializedName("ti") + @Deprecated private long tableId; @SerializedName("di") + @Deprecated private long dbId; @SerializedName("ci") + @Deprecated private long ctlId; - public BaseTableInfo(long tableId, long dbId) { - this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null"); - this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null"); - this.ctlId = InternalCatalog.INTERNAL_CATALOG_ID; - } - - public BaseTableInfo(long tableId, long dbId, long ctlId) { - this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null"); - this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null"); - this.ctlId = java.util.Objects.requireNonNull(ctlId, "ctlId is null"); - } + @SerializedName("tn") + private String tableName; + @SerializedName("dn") + private String dbName; + @SerializedName("cn") + private String ctlName; public BaseTableInfo(TableIf table) { + java.util.Objects.requireNonNull(table, "table is null"); DatabaseIf database = table.getDatabase(); java.util.Objects.requireNonNull(database, "database is null"); CatalogIf catalog = database.getCatalog(); @@ -58,20 +63,53 @@ public BaseTableInfo(TableIf table) { this.tableId = table.getId(); this.dbId = database.getId(); this.ctlId = catalog.getId(); + this.tableName = table.getName(); + this.dbName = database.getFullName(); + this.ctlName = catalog.getName(); + } + + // for replay MTMV, can not use `table.getDatabase();`,because database not added to catalog + public BaseTableInfo(OlapTable table, long dbId) { + java.util.Objects.requireNonNull(table, "table is null"); + this.tableId = table.getId(); + this.dbId = dbId; + this.ctlId = InternalCatalog.INTERNAL_CATALOG_ID; + this.tableName = table.getName(); + this.dbName = table.getDBName(); + this.ctlName = InternalCatalog.INTERNAL_CATALOG_NAME; + } + + public String getTableName() { + return tableName; + } + + public String getDbName() { + return dbName; + } + + public String getCtlName() { + return ctlName; } + @Deprecated public long getTableId() { return tableId; } + @Deprecated public long getDbId() { return dbId; } + @Deprecated public long getCtlId() { return ctlId; } + public void setTableName(String tableName) { + this.tableName = tableName; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -81,31 +119,43 @@ public boolean equals(Object o) { return false; } BaseTableInfo that = (BaseTableInfo) o; - return Objects.equal(tableId, that.tableId) - && Objects.equal(dbId, that.dbId) - && Objects.equal(ctlId, that.ctlId); + // for compatibility + if (StringUtils.isEmpty(ctlName) || StringUtils.isEmpty(that.ctlName)) { + return Objects.equal(tableId, that.tableId) && Objects.equal( + dbId, that.dbId) && Objects.equal(ctlId, that.ctlId); + } else { + return Objects.equal(tableName, that.tableName) && Objects.equal( + dbName, that.dbName) && Objects.equal(ctlName, that.ctlName); + } } @Override public int hashCode() { - return Objects.hashCode(tableId, dbId, ctlId); + return Objects.hashCode(tableName, dbName, ctlName); } @Override public String toString() { return "BaseTableInfo{" - + "tableId=" + tableId - + ", dbId=" + dbId - + ", ctlId=" + ctlId + + "tableName='" + tableName + '\'' + + ", dbName='" + dbName + '\'' + + ", ctlName='" + ctlName + '\'' + '}'; } - public String getTableName() { + public void compatible(CatalogMgr catalogMgr) { + if (!StringUtils.isEmpty(ctlName)) { + return; + } try { - return MTMVUtil.getTable(this).getName(); + CatalogIf catalog = catalogMgr.getCatalogOrAnalysisException(ctlId); + DatabaseIf db = catalog.getDbOrAnalysisException(dbId); + TableIf table = db.getTableOrAnalysisException(tableId); + this.ctlName = catalog.getName(); + this.dbName = db.getFullName(); + this.tableName = table.getName(); } catch (AnalysisException e) { - LOG.warn("can not get table: " + this); - return ""; + LOG.warn("MTMV compatible failed, ctlId: {}, dbId: {}, tableId: {}", ctlId, dbId, tableId, e); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java index d9ab9984581227..e0edd06f8c9418 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVHookService.java @@ -104,7 +104,7 @@ public interface MTMVHookService { * * @param table */ - void alterTable(Table table); + void alterTable(Table table, String oldTableName); /** * Triggered when pause mtmv diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index bed44e8d37d136..11089899b309a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -188,7 +188,7 @@ public void dropTable(Table table) { } @Override - public void alterTable(Table table) { + public void alterTable(Table table, String oldTableName) { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java index ff4060f334a952..b3cd239269abc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.catalog.Column; import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.CatalogMgr; import com.google.gson.annotations.SerializedName; @@ -149,4 +150,11 @@ public String toNameString() { + '}'; } } + + public void compatible(CatalogMgr catalogMgr) { + if (relatedTable == null) { + return; + } + relatedTable.compatible(catalogMgr); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index e6a89007310d6e..2a0863a6e61101 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -448,7 +448,7 @@ private static boolean isSyncWithBaseTable(MTMVRefreshContext context, String mt } MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context); return mtmv.getRefreshSnapshot() - .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); + .equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot); } /** @@ -496,8 +496,8 @@ private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMVRefres if (!(table instanceof MTMVRelatedTableIf)) { continue; } - refreshPartitionSnapshot.getTables() - .put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot(context)); + refreshPartitionSnapshot.addTableSnapshot(baseTableInfo, + ((MTMVRelatedTableIf) table).getTableSnapshot(context)); } return refreshPartitionSnapshot; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java index 2336c3922ea4b1..63bbfc2e037084 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java @@ -17,35 +17,88 @@ package org.apache.doris.mtmv; +import org.apache.doris.catalog.MTMV; + import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; public class MTMVRefreshPartitionSnapshot { + private static final Logger LOG = LogManager.getLogger(MTMV.class); @SerializedName("p") private Map partitions; + // old version only persist table id, we need `BaseTableInfo`, `tables` only for compatible old version @SerializedName("t") + @Deprecated private Map tables; + @SerializedName("ti") + private Map tablesInfo; public MTMVRefreshPartitionSnapshot() { this.partitions = Maps.newConcurrentMap(); this.tables = Maps.newConcurrentMap(); + this.tablesInfo = Maps.newConcurrentMap(); } public Map getPartitions() { return partitions; } - public Map getTables() { - return tables; + public MTMVSnapshotIf getTableSnapshot(BaseTableInfo table) { + if (tablesInfo.containsKey(table)) { + return tablesInfo.get(table); + } + // for compatible old version + return tables.get(table.getTableId()); + } + + public void addTableSnapshot(BaseTableInfo baseTableInfo, MTMVSnapshotIf tableSnapshot) { + tablesInfo.put(baseTableInfo, tableSnapshot); + // for compatible old version + tables.put(baseTableInfo.getTableId(), tableSnapshot); } @Override public String toString() { return "MTMVRefreshPartitionSnapshot{" + "partitions=" + partitions - + ", tables=" + tables + + ", tablesInfo=" + tablesInfo + '}'; } + + public void compatible(MTMV mtmv) { + if (tables.size() == tablesInfo.size()) { + return; + } + MTMVRelation relation = mtmv.getRelation(); + if (relation == null || CollectionUtils.isEmpty(relation.getBaseTablesOneLevel())) { + return; + } + for (Entry entry : tables.entrySet()) { + Optional tableInfo = getByTableId(entry.getKey(), + relation.getBaseTablesOneLevel()); + if (tableInfo.isPresent()) { + tablesInfo.put(tableInfo.get(), entry.getValue()); + } else { + LOG.warn("MTMV compatible failed, tableId: {}, relationTables: {}", entry.getKey(), + relation.getBaseTablesOneLevel()); + } + } + } + + private Optional getByTableId(Long tableId, Set baseTables) { + for (BaseTableInfo info : baseTables) { + if (info.getTableId() == tableId) { + return Optional.of(info); + } + } + return Optional.empty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java index d48911275e886b..74fc3cc1c5cfb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java @@ -17,6 +17,8 @@ package org.apache.doris.mtmv; +import org.apache.doris.catalog.MTMV; + import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; @@ -55,13 +57,13 @@ public Set getSnapshotPartitions(String mtmvPartitionName) { return partitionSnapshot.getPartitions().keySet(); } - public boolean equalsWithBaseTable(String mtmvPartitionName, long baseTableId, + public boolean equalsWithBaseTable(String mtmvPartitionName, BaseTableInfo tableInfo, MTMVSnapshotIf baseTableCurrentSnapshot) { MTMVRefreshPartitionSnapshot partitionSnapshot = partitionSnapshots.get(mtmvPartitionName); if (partitionSnapshot == null) { return false; } - MTMVSnapshotIf relatedPartitionSnapshot = partitionSnapshot.getTables().get(baseTableId); + MTMVSnapshotIf relatedPartitionSnapshot = partitionSnapshot.getTableSnapshot(tableInfo); if (relatedPartitionSnapshot == null) { return false; } @@ -88,4 +90,13 @@ public String toString() { + "partitionSnapshots=" + partitionSnapshots + '}'; } + + public void compatible(MTMV mtmv) { + if (MapUtils.isEmpty(partitionSnapshots)) { + return; + } + for (MTMVRefreshPartitionSnapshot snapshot : partitionSnapshots.values()) { + snapshot.compatible(mtmv); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelation.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelation.java index aec89caa508423..87a0199f128f88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelation.java @@ -17,7 +17,10 @@ package org.apache.doris.mtmv; +import org.apache.doris.datasource.CatalogMgr; + import com.google.gson.annotations.SerializedName; +import org.apache.commons.collections.CollectionUtils; import java.util.Set; @@ -61,4 +64,19 @@ public String toInfoString() { + ", baseViews=" + baseViews + '}'; } + + public void compatible(CatalogMgr catalogMgr) { + compatible(catalogMgr, baseTables); + compatible(catalogMgr, baseViews); + compatible(catalogMgr, baseTablesOneLevel); + } + + private void compatible(CatalogMgr catalogMgr, Set infos) { + if (CollectionUtils.isEmpty(infos)) { + return; + } + for (BaseTableInfo baseTableInfo : infos) { + baseTableInfo.compatible(catalogMgr); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java index b5f8bbbf663d26..436427526ba08b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java @@ -187,7 +187,7 @@ public void dropMTMV(MTMV mtmv) throws DdlException { */ @Override public void registerMTMV(MTMV mtmv, Long dbId) { - refreshMTMVCache(mtmv.getRelation(), new BaseTableInfo(mtmv.getId(), dbId)); + refreshMTMVCache(mtmv.getRelation(), new BaseTableInfo(mtmv, dbId)); } /** @@ -232,7 +232,7 @@ public void refreshComplete(MTMV mtmv, MTMVRelation relation, MTMVTask task) { */ @Override public void dropTable(Table table) { - processBaseTableChange(table, "The base table has been deleted:"); + processBaseTableChange(new BaseTableInfo(table), "The base table has been deleted:"); } /** @@ -241,8 +241,10 @@ public void dropTable(Table table) { * @param table */ @Override - public void alterTable(Table table) { - processBaseTableChange(table, "The base table has been updated:"); + public void alterTable(Table table, String oldTableName) { + BaseTableInfo baseTableInfo = new BaseTableInfo(table); + baseTableInfo.setTableName(oldTableName); + processBaseTableChange(baseTableInfo, "The base table has been updated:"); } @Override @@ -260,8 +262,7 @@ public void cancelMTMVTask(CancelMTMVTaskInfo info) { } - private void processBaseTableChange(Table table, String msgPrefix) { - BaseTableInfo baseTableInfo = new BaseTableInfo(table); + private void processBaseTableChange(BaseTableInfo baseTableInfo, String msgPrefix) { Set mtmvsByBaseTable = getMtmvsByBaseTable(baseTableInfo); if (CollectionUtils.isEmpty(mtmvsByBaseTable)) { return; @@ -269,9 +270,7 @@ private void processBaseTableChange(Table table, String msgPrefix) { for (BaseTableInfo mtmvInfo : mtmvsByBaseTable) { Table mtmv = null; try { - mtmv = Env.getCurrentEnv().getInternalCatalog() - .getDbOrAnalysisException(mtmvInfo.getDbId()) - .getTableOrAnalysisException(mtmvInfo.getTableId()); + mtmv = (Table) MTMVUtil.getTable(mtmvInfo); } catch (AnalysisException e) { LOG.warn(e); continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index d5d86b7eedab97..4b740b75ef8ce7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -17,8 +17,10 @@ package org.apache.doris.mtmv; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -128,11 +130,11 @@ public void dropTable(Table table) { } } - public void alterTable(Table table) { + public void alterTable(Table table, String oldTableName) { Objects.requireNonNull(table); LOG.info("alterTable, tableName: {}", table.getName()); for (MTMVHookService mtmvHookService : hooks.values()) { - mtmvHookService.alterTable(table); + mtmvHookService.alterTable(table, oldTableName); } } @@ -177,12 +179,21 @@ public void processEvent(Event event) throws EventException { } TableEvent tableEvent = (TableEvent) event; LOG.info("processEvent, Event: {}", event); + TableIf table; + try { + table = Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(tableEvent.getCtlName()) + .getDbOrAnalysisException(tableEvent.getDbName()) + .getTableOrAnalysisException(tableEvent.getTableName()); + } catch (AnalysisException e) { + throw new EventException(e); + } Set mtmvs = relationManager.getMtmvsByBaseTableOneLevel( - new BaseTableInfo(tableEvent.getTableId(), tableEvent.getDbId(), tableEvent.getCtlId())); + new BaseTableInfo(table)); for (BaseTableInfo baseTableInfo : mtmvs) { try { // check if mtmv should trigger by event - MTMV mtmv = MTMVUtil.getMTMV(baseTableInfo.getDbId(), baseTableInfo.getTableId()); + MTMV mtmv = (MTMV) MTMVUtil.getTable(baseTableInfo); if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) { jobManager.onCommit(mtmv); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java index 4868ef94a1b570..e84136489291f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -27,6 +27,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.CatalogMgr; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform; @@ -37,6 +38,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import java.util.List; import java.util.Optional; @@ -52,11 +54,18 @@ public class MTMVUtil { * @throws AnalysisException */ public static TableIf getTable(BaseTableInfo baseTableInfo) throws AnalysisException { - TableIf table = Env.getCurrentEnv().getCatalogMgr() - .getCatalogOrAnalysisException(baseTableInfo.getCtlId()) - .getDbOrAnalysisException(baseTableInfo.getDbId()) - .getTableOrAnalysisException(baseTableInfo.getTableId()); - return table; + // for compatible old version, not have name + if (StringUtils.isEmpty(baseTableInfo.getCtlName())) { + return Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(baseTableInfo.getCtlId()) + .getDbOrAnalysisException(baseTableInfo.getDbId()) + .getTableOrAnalysisException(baseTableInfo.getTableId()); + } else { + return Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(baseTableInfo.getCtlName()) + .getDbOrAnalysisException(baseTableInfo.getDbName()) + .getTableOrAnalysisException(baseTableInfo.getTableName()); + } } public static MTMVRelatedTableIf getRelatedTable(BaseTableInfo baseTableInfo) { @@ -87,7 +96,7 @@ public static MTMV getMTMV(long dbId, long mtmvId) throws DdlException, MetaNotF public static boolean mtmvContainsExternalTable(MTMV mtmv) { Set baseTables = mtmv.getRelation().getBaseTablesOneLevel(); for (BaseTableInfo baseTableInfo : baseTables) { - if (baseTableInfo.getCtlId() != InternalCatalog.INTERNAL_CATALOG_ID) { + if (!baseTableInfo.getCtlName().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) { return true; } } @@ -151,4 +160,16 @@ public static void checkModifyMTMVData(Database db, List tableIdList, Conn } } } + + public static void compatibleMTMV(CatalogMgr catalogMgr) { + List dbs = catalogMgr.getInternalCatalog().getDbs(); + for (Database database : dbs) { + List tables = database.getTables(); + for (Table table : tables) { + if (table instanceof MTMV) { + ((MTMV) table).compatible(catalogMgr); + } + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVRenameInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVRenameInfo.java index c86626b5920cca..066342c3b2c706 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVRenameInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVRenameInfo.java @@ -61,6 +61,6 @@ public void run() throws DdlException { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb()); Table table = db.getTableOrDdlException(mvName.getTbl()); Env.getCurrentEnv().renameTable(db, table, newName); - Env.getCurrentEnv().getMtmvService().alterTable(table); + Env.getCurrentEnv().getMtmvService().alterTable(table, mvName.getTbl()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVReplaceInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVReplaceInfo.java index 440db1e1400cdc..6dd0907db62063 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVReplaceInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AlterMTMVReplaceInfo.java @@ -90,9 +90,9 @@ public void run() throws UserException { MTMV mtmv = (MTMV) db.getTableOrDdlException(mvName.getTbl(), TableType.MATERIALIZED_VIEW); MTMV newMtmv = (MTMV) db.getTableOrDdlException(newName, TableType.MATERIALIZED_VIEW); Env.getCurrentEnv().getAlterInstance().processReplaceTable(db, mtmv, newName, swapTable); - Env.getCurrentEnv().getMtmvService().alterTable(newMtmv); + Env.getCurrentEnv().getMtmvService().alterTable(newMtmv, mvName.getTbl()); if (swapTable) { - Env.getCurrentEnv().getMtmvService().alterTable(mtmv); + Env.getCurrentEnv().getMtmvService().alterTable(mtmv, newName); } else { Env.getCurrentEnv().getMtmvService().dropMTMV(mtmv); Env.getCurrentEnv().getMtmvService().dropTable(mtmv); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 6b338d7f827f43..fa44bc30959692 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1199,7 +1199,7 @@ private void setTableVersion(TransactionState transactionState, Database db) { } } - private void produceEvent(TransactionState transactionState, Database db) { + private void produceEvent(TransactionState transactionState, Database db) throws AnalysisException { Collection tableCommitInfos; if (!transactionState.getSubTxnIdToTableCommitInfo().isEmpty()) { tableCommitInfos = transactionState.getSubTxnTableCommitInfos(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index 63a75c724988ed..997385742dc09a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -120,7 +120,7 @@ public void setUp() throws NoSuchMethodException, SecurityException, AnalysisExc minTimes = 0; result = refreshSnapshot; - refreshSnapshot.equalsWithBaseTable(anyString, anyLong, (MTMVSnapshotIf) any); + refreshSnapshot.equalsWithBaseTable(anyString, (BaseTableInfo) any, (MTMVSnapshotIf) any); minTimes = 0; result = true; @@ -157,7 +157,7 @@ public void testIsMTMVSyncNormal() { public void testIsMTMVSyncNotSync() { new Expectations() { { - refreshSnapshot.equalsWithBaseTable(anyString, anyLong, (MTMVSnapshotIf) any); + refreshSnapshot.equalsWithBaseTable(anyString, (BaseTableInfo) any, (MTMVSnapshotIf) any); minTimes = 0; result = false; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java index 42b5b7838419ed..1890f9c9805926 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRefreshSnapshotTest.java @@ -21,6 +21,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mocked; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -35,14 +37,54 @@ public class MTMVRefreshSnapshotTest { private MTMVRefreshSnapshot refreshSnapshot = new MTMVRefreshSnapshot(); private MTMVVersionSnapshot p1Snapshot = new MTMVVersionSnapshot(correctVersion); private MTMVVersionSnapshot t1Snapshot = new MTMVVersionSnapshot(correctVersion); + @Mocked + private BaseTableInfo existTable; + @Mocked + private BaseTableInfo nonExistTable; @Before public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException { + new Expectations() { + { + existTable.getCtlName(); + minTimes = 0; + result = "ctl1"; + + existTable.getDbName(); + minTimes = 0; + result = "db1"; + + existTable.getTableName(); + minTimes = 0; + result = "t1"; + + existTable.getTableId(); + minTimes = 0; + result = 1L; + + nonExistTable.getCtlName(); + minTimes = 0; + result = "ctl1"; + + nonExistTable.getDbName(); + minTimes = 0; + result = "db1"; + + nonExistTable.getTableName(); + minTimes = 0; + result = "t2"; + + nonExistTable.getTableId(); + minTimes = 0; + result = 2L; + } + }; + Map partitionSnapshots = Maps.newHashMap(); MTMVRefreshPartitionSnapshot mvp1PartitionSnapshot = new MTMVRefreshPartitionSnapshot(); partitionSnapshots.put(mvExistPartitionName, mvp1PartitionSnapshot); mvp1PartitionSnapshot.getPartitions().put(relatedExistPartitionName, p1Snapshot); - mvp1PartitionSnapshot.getTables().put(baseExistTableId, t1Snapshot); + mvp1PartitionSnapshot.addTableSnapshot(existTable, t1Snapshot); refreshSnapshot.updateSnapshots(partitionSnapshots, Sets.newHashSet(mvExistPartitionName)); } @@ -73,23 +115,23 @@ public void testPartitionSync() { @Test public void testTableSync() { // normal - boolean sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId, + boolean sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, existTable, new MTMVVersionSnapshot(correctVersion)); Assert.assertTrue(sync); // non exist mv partition sync = refreshSnapshot - .equalsWithBaseTable("mvp2", baseExistTableId, new MTMVVersionSnapshot(correctVersion)); + .equalsWithBaseTable("mvp2", existTable, new MTMVVersionSnapshot(correctVersion)); Assert.assertFalse(sync); // non exist related partition sync = refreshSnapshot - .equalsWithBaseTable(mvExistPartitionName, 2L, new MTMVVersionSnapshot(correctVersion)); + .equalsWithBaseTable(mvExistPartitionName, nonExistTable, new MTMVVersionSnapshot(correctVersion)); Assert.assertFalse(sync); // snapshot value not equal sync = refreshSnapshot - .equalsWithBaseTable(mvExistPartitionName, baseExistTableId, new MTMVVersionSnapshot(2L)); + .equalsWithBaseTable(mvExistPartitionName, existTable, new MTMVVersionSnapshot(2L)); Assert.assertFalse(sync); // snapshot type not equal - sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, baseExistTableId, + sync = refreshSnapshot.equalsWithBaseTable(mvExistPartitionName, existTable, new MTMVTimestampSnapshot(correctVersion)); Assert.assertFalse(sync); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelationManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelationManagerTest.java index 697643337c2391..40263705c43f99 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelationManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelationManagerTest.java @@ -17,22 +17,87 @@ package org.apache.doris.mtmv; +import org.apache.doris.common.AnalysisException; + import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mocked; import org.apache.commons.collections.CollectionUtils; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.util.Set; public class MTMVRelationManagerTest { + @Mocked + private BaseTableInfo mv1; + @Mocked + private BaseTableInfo mv2; + @Mocked + private BaseTableInfo t3; + @Mocked + private BaseTableInfo t4; + + @Before + public void setUp() throws NoSuchMethodException, SecurityException, AnalysisException { + new Expectations() { + { + mv1.getCtlName(); + minTimes = 0; + result = "ctl1"; + + mv1.getDbName(); + minTimes = 0; + result = "db1"; + + mv1.getTableName(); + minTimes = 0; + result = "mv1"; + + mv2.getCtlName(); + minTimes = 0; + result = "ctl1"; + + mv2.getDbName(); + minTimes = 0; + result = "db1"; + + mv2.getTableName(); + minTimes = 0; + result = "mv2"; + + t3.getCtlName(); + minTimes = 0; + result = "ctl1"; + + t3.getDbName(); + minTimes = 0; + result = "db1"; + + t3.getTableName(); + minTimes = 0; + result = "t3"; + + t4.getCtlName(); + minTimes = 0; + result = "ctl1"; + + t4.getDbName(); + minTimes = 0; + result = "db1"; + + t4.getTableName(); + minTimes = 0; + result = "t4"; + } + }; + } + @Test public void testGetMtmvsByBaseTableOneLevel() { // mock mv2==>mv1,t3; mv1==>t4 MTMVRelationManager manager = new MTMVRelationManager(); - BaseTableInfo mv1 = new BaseTableInfo(0L, 1L); - BaseTableInfo mv2 = new BaseTableInfo(0L, 2L); - BaseTableInfo t3 = new BaseTableInfo(0L, 3L); - BaseTableInfo t4 = new BaseTableInfo(0L, 4L); MTMVRelation mv2Relation = new MTMVRelation(Sets.newHashSet(mv1, t3, t4), Sets.newHashSet(mv1, t3), Sets.newHashSet()); MTMVRelation mv1Relation = new MTMVRelation(Sets.newHashSet(t4), Sets.newHashSet(t4), @@ -68,10 +133,6 @@ public void testGetMtmvsByBaseTableOneLevel() { public void testGetMtmvsByBaseTable() { // mock mv2==>mv1,t3; mv1==>t4 MTMVRelationManager manager = new MTMVRelationManager(); - BaseTableInfo mv1 = new BaseTableInfo(0L, 1L); - BaseTableInfo mv2 = new BaseTableInfo(0L, 2L); - BaseTableInfo t3 = new BaseTableInfo(0L, 3L); - BaseTableInfo t4 = new BaseTableInfo(0L, 4L); MTMVRelation mv2Relation = new MTMVRelation(Sets.newHashSet(mv1, t3, t4), Sets.newHashSet(mv1, t3), Sets.newHashSet()); MTMVRelation mv1Relation = new MTMVRelation(Sets.newHashSet(t4), Sets.newHashSet(t4), diff --git a/regression-test/data/mtmv_p0/test_hive_mtmv.out b/regression-test/data/mtmv_p0/test_hive_mtmv.out index 50c8016c87d087..1176f9320ce00f 100644 --- a/regression-test/data/mtmv_p0/test_hive_mtmv.out +++ b/regression-test/data/mtmv_p0/test_hive_mtmv.out @@ -4,12 +4,21 @@ 2 B 20230101 3 C 20230101 --- !refresh_other_partition -- +-- !refresh_complete -- 1 A 20230101 2 B 20230101 3 C 20230101 +4 D 20230102 +5 E 20230102 +6 F 20230102 --- !refresh_complete -- +-- !is_sync_before_rebuild -- +true + +-- !is_sync_after_rebuild -- +true + +-- !refresh_complete_rebuild -- 1 A 20230101 2 B 20230101 3 C 20230101 @@ -22,12 +31,21 @@ 2 B 20230101 3 C 20230101 --- !refresh_other_partition -- +-- !refresh_complete -- 1 A 20230101 2 B 20230101 3 C 20230101 +4 D 20230102 +5 E 20230102 +6 F 20230102 --- !refresh_complete -- +-- !is_sync_before_rebuild -- +true + +-- !is_sync_after_rebuild -- +true + +-- !refresh_complete_rebuild -- 1 A 20230101 2 B 20230101 3 C 20230101 diff --git a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy index 872d7bf8ec0369..4ac5ad9e890463 100644 --- a/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_hive_mtmv.groovy @@ -68,8 +68,24 @@ suite("test_hive_mtmv", "p0,external,hive,external_docker,external_docker_hive") waitingMTMVTaskFinished(jobName) order_qt_refresh_complete "SELECT * FROM ${mvName} order by id" - sql """drop materialized view if exists ${mvName};""" + order_qt_is_sync_before_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + // rebuild catalog, should not Affects MTMV + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + + order_qt_is_sync_after_rebuild "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + // should refresh normal after catalog rebuild + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + waitingMTMVTaskFinished(jobName) + order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} order by id" + + sql """drop materialized view if exists ${mvName};""" sql """drop catalog if exists ${catalog_name}""" } finally { }