From 91f8d4e791b82fdc2a3528c34d4ad5108123ec4d Mon Sep 17 00:00:00 2001 From: morrySnow Date: Fri, 24 Jan 2025 14:56:28 +0800 Subject: [PATCH] branch-2.1: [fix](Nereids) Use the schema saved during planning as the schema of the original target table #47337 pick from master #47337 Related PR: #47033 #45045 Problem Summary: because schema change does not involve recreating the table object, but rather rebuilding the full schema. So, we should use the schema saved during planning as the schema of the original target table. --- .../apache/doris/nereids/StatementContext.java | 7 +++++++ .../generator/PlanPatternGeneratorAnalyzer.java | 4 +--- .../nereids/rules/analysis/CollectRelation.java | 14 +++++++++++--- .../commands/insert/InsertIntoTableCommand.java | 16 ++++++---------- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index cff14675bf03f2..d33832a4f958c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; import org.apache.doris.catalog.constraint.TableIdentifier; @@ -179,6 +180,8 @@ public enum TableFrom { private final Map, TableIf> insertTargetTables = Maps.newHashMap(); // save view's def to avoid them change before lock private final Map, String> viewInfos = Maps.newHashMap(); + // save insert into schema to avoid schema changed between two read locks + private final List insertTargetSchema = new ArrayList<>(); // for create view support in nereids // key is the start and end position of the sql substring that needs to be replaced, @@ -276,6 +279,10 @@ public Map, TableIf> getTables() { return tables; } + public List getInsertTargetSchema() { + return insertTargetSchema; + } + public void setTables(Map, TableIf> tables) { this.tables.clear(); this.tables.putAll(tables); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java index 99d7c308dacf0d..23e7b5eca762ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java @@ -19,7 +19,6 @@ import org.apache.doris.nereids.pattern.generator.javaast.ClassDeclaration; -import java.lang.reflect.Modifier; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,8 +44,7 @@ public String generatePatterns(String className, String parentClassName, boolean Map> planClassMap = analyzer.getParentClassMap().entrySet().stream() .filter(kv -> kv.getValue().contains("org.apache.doris.nereids.trees.plans.Plan")) .filter(kv -> !kv.getKey().name.equals("GroupPlan")) - .filter(kv -> !Modifier.isAbstract(kv.getKey().modifiers.mod) - && kv.getKey() instanceof ClassDeclaration) + .filter(kv -> kv.getKey() instanceof ClassDeclaration) .collect(Collectors.toMap(kv -> (ClassDeclaration) kv.getKey(), kv -> kv.getValue())); List generators = planClassMap.entrySet() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java index 67edbe4f4ced09..24a85356aba18e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java @@ -40,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.util.RelationUtil; import com.google.common.collect.ImmutableList; @@ -74,8 +75,8 @@ public List buildRules() { unboundRelation() .thenApply(this::collectFromUnboundRelation) .toRule(RuleType.COLLECT_TABLE_FROM_RELATION), - unboundTableSink() - .thenApply(this::collectFromUnboundTableSink) + unboundLogicalSink() + .thenApply(this::collectFromUnboundSink) .toRule(RuleType.COLLECT_TABLE_FROM_SINK), any().whenNot(UnboundRelation.class::isInstance) .whenNot(UnboundTableSink.class::isInstance) @@ -123,7 +124,7 @@ private Plan collectFromAny(MatchingContext ctx) { return null; } - private Plan collectFromUnboundTableSink(MatchingContext> ctx) { + private Plan collectFromUnboundSink(MatchingContext> ctx) { List nameParts = ctx.root.getNameParts(); switch (nameParts.size()) { case 1: @@ -181,6 +182,13 @@ private void collectFromUnboundRelation(CascadesContext cascadesContext, if (tableFrom == TableFrom.QUERY) { collectMTMVCandidates(table, cascadesContext); } + if (tableFrom == TableFrom.INSERT_TARGET) { + if (!cascadesContext.getStatementContext().getInsertTargetSchema().isEmpty()) { + LOG.warn("collect insert target table '{}' more than once.", tableQualifier); + } + cascadesContext.getStatementContext().getInsertTargetSchema().clear(); + cascadesContext.getStatementContext().getInsertTargetSchema().addAll(table.getFullSchema()); + } if (table instanceof View) { parseAndCollectFromView(tableQualifier, (View) table, cascadesContext); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 82de92251def28..2f36d26cd1b66c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -24,7 +24,6 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; @@ -59,11 +58,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -181,9 +178,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor // lock after plan and check does table's schema changed to ensure we lock table order by id. TableIf newestTargetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv()); - List targetTables = Lists.newArrayList(targetTableIf, newestTargetTableIf); - targetTables.sort(Comparator.comparing(TableIf::getId)); - MetaLockUtils.readLockTables(targetTables); + newestTargetTableIf.readLock(); try { if (targetTableIf.getId() != newestTargetTableIf.getId()) { LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}", @@ -191,10 +186,11 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor targetTableIf.getId(), newestTargetTableIf.getId()); continue; } - if (!targetTableIf.getFullSchema().equals(newestTargetTableIf.getFullSchema())) { + // Use the schema saved during planning as the schema of the original target table. + if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) { LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}", retryTimes, DebugUtil.printId(ctx.queryId()), - targetTableIf.getFullSchema(), newestTargetTableIf.getFullSchema()); + ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema()); continue; } if (ctx.getConnectType() == ConnectType.MYSQL && ctx.getMysqlChannel() != null) { @@ -207,9 +203,9 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor buildResult.physicalSink ); } - MetaLockUtils.readUnlockTables(targetTables); + newestTargetTableIf.readUnlock(); } catch (Throwable e) { - MetaLockUtils.readUnlockTables(targetTables); + newestTargetTableIf.readUnlock(); // the abortTxn in onFail need to acquire table write lock if (insertExecutor != null) { insertExecutor.onFail(e);