Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ private void modifyViewDef(Database db, View view, String inlineViewDef, long sq
db.registerTable(view);
AlterViewInfo alterViewInfo = new AlterViewInfo(db.getId(), view.getId(),
inlineViewDef, newFullSchema, sqlMode, comment);
Env.getCurrentEnv().getMtmvService().alterView(new BaseTableInfo(view));
Env.getCurrentEnv().getEditLog().logModifyViewDef(alterViewInfo);
LOG.info("modify view[{}] definition to {}", viewName, inlineViewDef);
} finally {
Expand Down Expand Up @@ -863,7 +864,7 @@ public void replayModifyViewDef(AlterViewInfo alterViewInfo) throws MetaNotFound

db.unregisterTable(viewName);
db.registerTable(view);

Env.getCurrentEnv().getMtmvService().alterView(new BaseTableInfo(view));
LOG.info("replay modify view[{}] definition to {}", viewName, inlineViewDef);
} finally {
view.writeUnlock();
Expand Down Expand Up @@ -1236,6 +1237,7 @@ public AlterHandler getSystemHandler() {
public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) {
TableNameInfo tbl = alterMTMV.getMvName();
MTMV mtmv = null;
boolean alterSuccess = true;
try {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(tbl.getDb());
mtmv = (MTMV) db.getTableOrMetaException(tbl.getTbl(), TableType.MATERIALIZED_VIEW);
Expand All @@ -1250,7 +1252,8 @@ public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) {
mtmv.alterMvProperties(alterMTMV.getMvProperties());
break;
case ADD_TASK:
mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots(),
alterSuccess = mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(),
alterMTMV.getPartitionSnapshots(),
isReplay);
// If it is not a replay thread, it means that the current service is already a new version
// and does not require compatibility
Expand All @@ -1265,7 +1268,7 @@ public void processAlterMTMV(AlterMTMV alterMTMV, boolean isReplay) {
Env.getCurrentEnv().getMtmvService().alterJob(mtmv, isReplay);
}
// 4. log it and replay it in the follower
if (!isReplay) {
if (!isReplay && alterSuccess) {
Env.getCurrentEnv().getEditLog().logAlterMTMV(alterMTMV);
}
} catch (UserException e) {
Expand Down
34 changes: 33 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class MTMV extends OlapTable {
private MTMVRefreshSnapshot refreshSnapshot;
// Should update after every fresh, not persist
private MTMVCache cache;
private long schemaChangeVersion;

// For deserialization
public MTMV() {
Expand Down Expand Up @@ -179,13 +180,26 @@ public MTMVStatus alterStatus(MTMVStatus newStatus) {
writeMvLock();
try {
// only can update state, refresh state will be change by add task
this.schemaChangeVersion++;
return this.status.updateStateAndDetail(newStatus);
} finally {
writeMvUnlock();
}
}

public void addTaskResult(MTMVTask task, MTMVRelation relation,
public void processBaseViewChange(String schemaChangeDetail) {
writeMvLock();
try {
this.schemaChangeVersion++;
this.status.setState(MTMVState.SCHEMA_CHANGE);
this.status.setSchemaChangeDetail(schemaChangeDetail);
this.refreshSnapshot = new MTMVRefreshSnapshot();
} finally {
writeMvUnlock();
}
}

public boolean addTaskResult(MTMVTask task, MTMVRelation relation,
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots, boolean isReplay) {
MTMVCache mtmvCache = null;
boolean needUpdateCache = false;
Expand All @@ -209,6 +223,14 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
}
writeMvLock();
try {
if (!isReplay && task.getMtmvSchemaChangeVersion() != this.schemaChangeVersion) {
LOG.warn(
"addTaskResult failed, schemaChangeVersion has changed. "
+ "mvName: {}, taskId: {}, taskSchemaChangeVersion: {}, "
+ "mvSchemaChangeVersion: {}",
name, task.getTaskId(), task.getMtmvSchemaChangeVersion(), this.schemaChangeVersion);
return false;
}
if (task.getStatus() == TaskStatus.SUCCESS) {
this.status.setState(MTMVState.NORMAL);
this.status.setSchemaChangeDetail(null);
Expand All @@ -224,6 +246,7 @@ public void addTaskResult(MTMVTask task, MTMVRelation relation,
this.refreshSnapshot.updateSnapshots(partitionSnapshots, getPartitionNames());
Env.getCurrentEnv().getMtmvService()
.refreshComplete(this, relation, task);
return true;
} finally {
writeMvUnlock();
}
Expand Down Expand Up @@ -372,6 +395,15 @@ public MTMVRefreshSnapshot getRefreshSnapshot() {
return refreshSnapshot;
}

public long getSchemaChangeVersion() {
readMvLock();
try {
return schemaChangeVersion;
} finally {
readMvUnlock();
}
}

/**
* generateMvPartitionDescs
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.es.EsRepository;
import org.apache.doris.event.DropPartitionEvent;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType;
Expand Down Expand Up @@ -989,8 +990,9 @@ private void dropTableInternal(Database db, Table table, boolean isView, boolean
} finally {
table.writeUnlock();
}

Env.getCurrentEnv().getMtmvService().dropTable(table);
if (table instanceof OlapTable) {
Env.getCurrentEnv().getMtmvService().dropTable(table);
}
if (Config.isCloudMode()) {
((CloudGlobalTransactionMgr) Env.getCurrentGlobalTransactionMgr())
.clearTableLastTxnId(db.getId(), table.getId());
Expand All @@ -1017,6 +1019,9 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop,
if (table instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().dropJob((MTMV) table, isReplay);
}
if (table instanceof View) {
Env.getCurrentEnv().getMtmvService().dropView(new BaseTableInfo(table));
}
Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId());
Env.getCurrentEnv().getDictionaryManager().dropTableDictionaries(db.getName(), table.getName());
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentInternalCatalog().getId(), db.getId(), table.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
Expand Down Expand Up @@ -59,7 +58,6 @@
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -90,7 +88,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class MTMVTask extends AbstractTask {
private static final Logger LOG = LogManager.getLogger(MTMVTask.class);
Expand Down Expand Up @@ -158,6 +155,7 @@ public enum MTMVTaskRefreshMode {
private MTMVRelation relation;
private StmtExecutor executor;
private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
private long mtmvSchemaChangeVersion;

private Map<MvccTableInfo, MvccSnapshot> snapshots = Maps.newHashMap();

Expand All @@ -182,6 +180,7 @@ public void run() throws JobException {
if (LOG.isDebugEnabled()) {
LOG.debug("mtmv task run, taskId: {}", super.getTaskId());
}
mtmvSchemaChangeVersion = mtmv.getSchemaChangeVersion();
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv, MTMVPlanUtil.DISABLE_RULES_WHEN_RUN_MTMV_TASK);
try {
if (LOG.isDebugEnabled()) {
Expand All @@ -193,10 +192,10 @@ public void run() throws JobException {
}
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
// such as deleting a table and creating a view with the same name
Set<TableIf> tablesInPlan = MTMVPlanUtil.getBaseTableFromQuery(mtmv.getQuerySql(), ctx);
this.relation = MTMVPlanUtil.generateMTMVRelation(tablesInPlan, ctx);
Pair<Set<TableIf>, Set<TableIf>> tablesInPlan = MTMVPlanUtil.getBaseTableFromQuery(mtmv.getQuerySql(), ctx);
this.relation = MTMVPlanUtil.generateMTMVRelation(tablesInPlan.first, tablesInPlan.second);
beforeMTMVRefresh();
List<TableIf> tableIfs = Lists.newArrayList(tablesInPlan);
List<TableIf> tableIfs = Lists.newArrayList(tablesInPlan.first);
tableIfs.sort(Comparator.comparing(TableIf::getId));

MTMVRefreshContext context;
Expand All @@ -207,7 +206,7 @@ public void run() throws JobException {
// if mtmv is schema_change, check if column type has changed
// If it's not in the schema_change state, the column type definitely won't change.
if (MTMVState.SCHEMA_CHANGE.equals(mtmv.getStatus().getState())) {
checkColumnTypeIfChange(mtmv, ctx);
MTMVPlanUtil.ensureMTMVQueryUsable(mtmv, ctx);
}
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
Expand Down Expand Up @@ -254,7 +253,8 @@ public void run() throws JobException {
.subList(start, Math.min(end, needRefreshPartitions.size())));
// need get names before exec
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil
.generatePartitionSnapshots(context, relation.getBaseTablesOneLevel(), execPartitionNames);
.generatePartitionSnapshots(context, relation.getBaseTablesOneLevelAndFromView(),
execPartitionNames);
try {
executeWithRetry(execPartitionNames, tableWithPartKey);
} catch (Exception e) {
Expand All @@ -277,39 +277,6 @@ public void run() throws JobException {
}
}

private void checkColumnTypeIfChange(MTMV mtmv, ConnectContext ctx) throws JobException {
List<ColumnDefinition> currentColumnsDefinition = MTMVPlanUtil.generateColumnsBySql(mtmv.getQuerySql(), ctx,
mtmv.getMvPartitionInfo().getPartitionCol(),
mtmv.getDistributionColumnNames(), null, mtmv.getTableProperty().getProperties());
List<Column> currentColumns = currentColumnsDefinition.stream()
.map(ColumnDefinition::translateToCatalogStyle)
.collect(Collectors.toList());
List<Column> originalColumns = mtmv.getBaseSchema(true);
if (currentColumns.size() != originalColumns.size()) {
throw new JobException(String.format(
"column length not equals, please check whether columns of base table have changed, "
+ "original length is: %s, current length is: %s",
originalColumns.size(), currentColumns.size()));
}
for (int i = 0; i < originalColumns.size(); i++) {
if (!isTypeLike(originalColumns.get(i).getType(), currentColumns.get(i).getType())) {
throw new JobException(String.format(
"column type not same, please check whether columns of base table have changed, "
+ "column name is: %s, original type is: %s, current type is: %s",
originalColumns.get(i).getName(), originalColumns.get(i).getType().toSql(),
currentColumns.get(i).getType().toSql()));
}
}
}

private boolean isTypeLike(Type type, Type typeOther) {
if (type.isStringType()) {
return typeOther.isStringType();
} else {
return type.equals(typeOther);
}
}

private void executeWithRetry(Set<String> execPartitionNames, Map<TableIf, String> tableWithPartKey)
throws Exception {
int retryCount = 0;
Expand All @@ -336,7 +303,7 @@ private void executeWithRetry(Set<String> execPartitionNames, Map<TableIf, Strin

retryCount++;
LOG.warn("Retrying execution due to exception: {}. Attempt {}/{}, "
+ "taskId {} execPartitionNames {} lastQueryId {}, randomMillis {}",
+ "taskId {} execPartitionNames {} lastQueryId {}, randomMillis {}",
e.getMessage(), retryCount, retryTime, getTaskId(),
execPartitionNames, lastQueryId, randomMillis);
if (retryCount >= retryTime) {
Expand All @@ -361,7 +328,7 @@ private void exec(Set<String> refreshPartitionNames,
// if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
.from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE
? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey);
? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey, statementContext);
try {
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
ctx.setExecutor(executor);
Expand Down Expand Up @@ -482,7 +449,7 @@ public void before() throws JobException {
* @throws DdlException
*/
private void beforeMTMVRefresh() throws AnalysisException, DdlException {
for (BaseTableInfo tableInfo : relation.getBaseTablesOneLevel()) {
for (BaseTableInfo tableInfo : relation.getBaseTablesOneLevelAndFromView()) {
TableIf tableIf = MTMVUtil.getTable(tableInfo);
if (tableIf instanceof MTMVBaseTableIf) {
MTMVBaseTableIf baseTableIf = (MTMVBaseTableIf) tableIf;
Expand Down Expand Up @@ -645,7 +612,7 @@ public List<String> calculateNeedRefreshPartitions(MTMVRefreshContext context)
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
boolean fresh = MTMVPartitionUtil.isMTMVSync(context, relation.getBaseTablesOneLevel(),
boolean fresh = MTMVPartitionUtil.isMTMVSync(context, relation.getBaseTablesOneLevelAndFromView(),
mtmv.getExcludedTriggerTables());
if (fresh) {
return Lists.newArrayList();
Expand All @@ -656,13 +623,17 @@ public List<String> calculateNeedRefreshPartitions(MTMVRefreshContext context)
}
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(context, relation.getBaseTablesOneLevel());
return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(context, relation.getBaseTablesOneLevelAndFromView());
}

public MTMVTaskContext getTaskContext() {
return taskContext;
}

public long getMtmvSchemaChangeVersion() {
return mtmvSchemaChangeVersion;
}

@Override
public String toString() {
return "MTMVTask{"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.mtmv;

import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;

import java.util.List;

public class MTMVAnalyzeQueryInfo {
private MTMVRelation relation;
private MTMVPartitionInfo mvPartitionInfo;
private List<ColumnDefinition> columnDefinitions;

public MTMVAnalyzeQueryInfo(List<ColumnDefinition> columnDefinitions, MTMVPartitionInfo mvPartitionInfo,
MTMVRelation relation) {
this.columnDefinitions = columnDefinitions;
this.mvPartitionInfo = mvPartitionInfo;
this.relation = relation;
}

public List<ColumnDefinition> getColumnDefinitions() {
return columnDefinitions;
}

public MTMVPartitionInfo getMvPartitionInfo() {
return mvPartitionInfo;
}

public MTMVRelation getRelation() {
return relation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,18 @@ public interface MTMVHookService {
* @param info
*/
void cancelMTMVTask(CancelMTMVTaskInfo info) throws DdlException, MetaNotFoundException, JobException;

/**
* Triggered when baseView is altered
*
* @param baseViewInfo
*/
void alterView(BaseTableInfo baseViewInfo);

/**
* Triggered when baseView is dropped
*
* @param baseViewInfo
*/
void dropView(BaseTableInfo baseViewInfo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,13 @@ private MTMVJob getJobByMTMV(MTMV mtmv) {
return (MTMVJob) Env.getCurrentEnv().getJobManager().getJob(mtmv.getId());
}

@Override
public void alterView(BaseTableInfo baseViewInfo) {

}

@Override
public void dropView(BaseTableInfo baseViewInfo) {

}
}
Loading
Loading