diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 597cef2d47e8c1..75353f446a4433 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; import org.apache.doris.catalog.constraint.TableIdentifier; @@ -178,6 +179,8 @@ public enum TableFrom { private final Map, TableIf> insertTargetTables = Maps.newHashMap(); // save view's def and sql mode to avoid them change before lock private final Map, Pair> viewInfos = Maps.newHashMap(); + // save insert into schema to avoid schema changed between two read locks + private final List insertTargetSchema = new ArrayList<>(); // for create view support in nereids // key is the start and end position of the sql substring that needs to be replaced, @@ -281,6 +284,10 @@ public Map, TableIf> getTables() { return tables; } + public List getInsertTargetSchema() { + return insertTargetSchema; + } + public void setTables(Map, TableIf> tables) { this.tables.clear(); this.tables.putAll(tables); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java index 99d7c308dacf0d..23e7b5eca762ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/generator/PlanPatternGeneratorAnalyzer.java @@ -19,7 +19,6 @@ import org.apache.doris.nereids.pattern.generator.javaast.ClassDeclaration; -import java.lang.reflect.Modifier; import java.util.List; import java.util.Map; import java.util.Optional; @@ -45,8 +44,7 @@ public String generatePatterns(String className, String parentClassName, boolean Map> planClassMap = analyzer.getParentClassMap().entrySet().stream() .filter(kv -> kv.getValue().contains("org.apache.doris.nereids.trees.plans.Plan")) .filter(kv -> !kv.getKey().name.equals("GroupPlan")) - .filter(kv -> !Modifier.isAbstract(kv.getKey().modifiers.mod) - && kv.getKey() instanceof ClassDeclaration) + .filter(kv -> kv.getKey() instanceof ClassDeclaration) .collect(Collectors.toMap(kv -> (ClassDeclaration) kv.getKey(), kv -> kv.getValue())); List generators = planClassMap.entrySet() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java index 9426ab4d382b77..92a4fb76d49aaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.util.RelationUtil; import com.google.common.collect.ImmutableList; @@ -75,8 +76,8 @@ public List buildRules() { unboundRelation() .thenApply(this::collectFromUnboundRelation) .toRule(RuleType.COLLECT_TABLE_FROM_RELATION), - unboundTableSink() - .thenApply(this::collectFromUnboundTableSink) + unboundLogicalSink() + .thenApply(this::collectFromUnboundSink) .toRule(RuleType.COLLECT_TABLE_FROM_SINK), any().whenNot(UnboundRelation.class::isInstance) .whenNot(UnboundTableSink.class::isInstance) @@ -124,7 +125,7 @@ private Plan collectFromAny(MatchingContext ctx) { return null; } - private Plan collectFromUnboundTableSink(MatchingContext> ctx) { + private Plan collectFromUnboundSink(MatchingContext> ctx) { List nameParts = ctx.root.getNameParts(); switch (nameParts.size()) { case 1: @@ -182,6 +183,13 @@ private void collectFromUnboundRelation(CascadesContext cascadesContext, if (tableFrom == TableFrom.QUERY) { collectMTMVCandidates(table, cascadesContext); } + if (tableFrom == TableFrom.INSERT_TARGET) { + if (!cascadesContext.getStatementContext().getInsertTargetSchema().isEmpty()) { + LOG.warn("collect insert target table '{}' more than once.", tableQualifier); + } + cascadesContext.getStatementContext().getInsertTargetSchema().clear(); + cascadesContext.getStatementContext().getInsertTargetSchema().addAll(table.getFullSchema()); + } if (table instanceof View) { parseAndCollectFromView(tableQualifier, (View) table, cascadesContext); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 76c72f82f90552..39c5909d4f553d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -28,7 +28,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.jdbc.JdbcExternalTable; @@ -73,7 +72,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -186,9 +184,7 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec // lock after plan and check does table's schema changed to ensure we lock table order by id. TableIf newestTargetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv()); - List targetTables = Lists.newArrayList(targetTableIf, newestTargetTableIf); - targetTables.sort(Comparator.comparing(TableIf::getId)); - MetaLockUtils.readLockTables(targetTables); + newestTargetTableIf.readLock(); try { if (targetTableIf.getId() != newestTargetTableIf.getId()) { LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}", @@ -196,10 +192,11 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec targetTableIf.getId(), newestTargetTableIf.getId()); continue; } - if (!targetTableIf.getFullSchema().equals(newestTargetTableIf.getFullSchema())) { + // Use the schema saved during planning as the schema of the original target table. + if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) { LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}", retryTimes, DebugUtil.printId(ctx.queryId()), - targetTableIf.getFullSchema(), newestTargetTableIf.getFullSchema()); + ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema()); continue; } if (!insertExecutor.isEmptyInsert()) { @@ -209,9 +206,9 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec buildResult.physicalSink ); } - MetaLockUtils.readUnlockTables(targetTables); + newestTargetTableIf.readUnlock(); } catch (Throwable e) { - MetaLockUtils.readUnlockTables(targetTables); + newestTargetTableIf.readUnlock(); // the abortTxn in onFail need to acquire table write lock if (insertExecutor != null) { insertExecutor.onFail(e);