Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.nereids;

import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.constraint.TableIdentifier;
Expand Down Expand Up @@ -178,6 +179,8 @@ public enum TableFrom {
private final Map<List<String>, TableIf> insertTargetTables = Maps.newHashMap();
// save view's def and sql mode to avoid them change before lock
private final Map<List<String>, Pair<String, Long>> viewInfos = Maps.newHashMap();
// save insert into schema to avoid schema changed between two read locks
private final List<Column> insertTargetSchema = new ArrayList<>();

// for create view support in nereids
// key is the start and end position of the sql substring that needs to be replaced,
Expand Down Expand Up @@ -281,6 +284,10 @@ public Map<List<String>, TableIf> getTables() {
return tables;
}

public List<Column> getInsertTargetSchema() {
return insertTargetSchema;
}

public void setTables(Map<List<String>, TableIf> tables) {
this.tables.clear();
this.tables.putAll(tables);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.nereids.pattern.generator.javaast.ClassDeclaration;

import java.lang.reflect.Modifier;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -45,8 +44,7 @@ public String generatePatterns(String className, String parentClassName, boolean
Map<ClassDeclaration, Set<String>> planClassMap = analyzer.getParentClassMap().entrySet().stream()
.filter(kv -> kv.getValue().contains("org.apache.doris.nereids.trees.plans.Plan"))
.filter(kv -> !kv.getKey().name.equals("GroupPlan"))
.filter(kv -> !Modifier.isAbstract(kv.getKey().modifiers.mod)
&& kv.getKey() instanceof ClassDeclaration)
.filter(kv -> kv.getKey() instanceof ClassDeclaration)
.collect(Collectors.toMap(kv -> (ClassDeclaration) kv.getKey(), kv -> kv.getValue()));

List<PlanPatternGenerator> generators = planClassMap.entrySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.util.RelationUtil;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -75,8 +76,8 @@ public List<Rule> buildRules() {
unboundRelation()
.thenApply(this::collectFromUnboundRelation)
.toRule(RuleType.COLLECT_TABLE_FROM_RELATION),
unboundTableSink()
.thenApply(this::collectFromUnboundTableSink)
unboundLogicalSink()
.thenApply(this::collectFromUnboundSink)
.toRule(RuleType.COLLECT_TABLE_FROM_SINK),
any().whenNot(UnboundRelation.class::isInstance)
.whenNot(UnboundTableSink.class::isInstance)
Expand Down Expand Up @@ -124,7 +125,7 @@ private Plan collectFromAny(MatchingContext<Plan> ctx) {
return null;
}

private Plan collectFromUnboundTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
private Plan collectFromUnboundSink(MatchingContext<UnboundLogicalSink<Plan>> ctx) {
List<String> nameParts = ctx.root.getNameParts();
switch (nameParts.size()) {
case 1:
Expand Down Expand Up @@ -182,6 +183,13 @@ private void collectFromUnboundRelation(CascadesContext cascadesContext,
if (tableFrom == TableFrom.QUERY) {
collectMTMVCandidates(table, cascadesContext);
}
if (tableFrom == TableFrom.INSERT_TARGET) {
if (!cascadesContext.getStatementContext().getInsertTargetSchema().isEmpty()) {
LOG.warn("collect insert target table '{}' more than once.", tableQualifier);
}
cascadesContext.getStatementContext().getInsertTargetSchema().clear();
cascadesContext.getStatementContext().getInsertTargetSchema().addAll(table.getFullSchema());
}
if (table instanceof View) {
parseAndCollectFromView(tableQualifier, (View) table, cascadesContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ProfileManager.ProfileType;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
Expand Down Expand Up @@ -73,7 +72,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -186,20 +184,19 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec

// lock after plan and check does table's schema changed to ensure we lock table order by id.
TableIf newestTargetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv());
List<TableIf> targetTables = Lists.newArrayList(targetTableIf, newestTargetTableIf);
targetTables.sort(Comparator.comparing(TableIf::getId));
MetaLockUtils.readLockTables(targetTables);
newestTargetTableIf.readLock();
try {
if (targetTableIf.getId() != newestTargetTableIf.getId()) {
LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}",
retryTimes, DebugUtil.printId(ctx.queryId()),
targetTableIf.getId(), newestTargetTableIf.getId());
continue;
}
if (!targetTableIf.getFullSchema().equals(newestTargetTableIf.getFullSchema())) {
// Use the schema saved during planning as the schema of the original target table.
if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) {
LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}",
retryTimes, DebugUtil.printId(ctx.queryId()),
targetTableIf.getFullSchema(), newestTargetTableIf.getFullSchema());
ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema());
continue;
}
if (!insertExecutor.isEmptyInsert()) {
Expand All @@ -209,9 +206,9 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor stmtExec
buildResult.physicalSink
);
}
MetaLockUtils.readUnlockTables(targetTables);
newestTargetTableIf.readUnlock();
} catch (Throwable e) {
MetaLockUtils.readUnlockTables(targetTables);
newestTargetTableIf.readUnlock();
// the abortTxn in onFail need to acquire table write lock
if (insertExecutor != null) {
insertExecutor.onFail(e);
Expand Down
Loading