Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ statement
(ROLLUP LEFT_PAREN rollupDefs RIGHT_PAREN)?
propertyClause?
(AS query)? #createTable
| explain? INSERT (INTO | OVERWRITE TABLE) tableName=multipartIdentifier
| explain? INSERT IGNORE? (INTO | OVERWRITE TABLE) tableName=multipartIdentifier
(PARTITION partition=identifierList)? // partition define
(WITH LABEL labelName=identifier)? cols=identifierList? // label and columns define
(LEFT_BRACKET hints=identifierSeq RIGHT_BRACKET)? // hint define
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,37 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
private final List<String> partitions;
private final boolean isPartialUpdate;
private final boolean isFromNativeInsertStmt;
private final boolean isIgnoreMode;

public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, false, false, Optional.empty(), Optional.empty(), child);
this(nameParts, colNames, hints, partitions, false, false, false, Optional.empty(), Optional.empty(), child);
}

public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, isPartialUpdate, false,
this(nameParts, colNames, hints, partitions, isPartialUpdate, false, false,
Optional.empty(), Optional.empty(), child);
}

public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, boolean isFromNativeInsertStmt, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, isPartialUpdate, isFromNativeInsertStmt,
this(nameParts, colNames, hints, partitions, isPartialUpdate, isFromNativeInsertStmt, false,
Optional.empty(), Optional.empty(), child);
}

public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, boolean isFromNativeInsertStmt, boolean isIgnoreMode,
CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, isPartialUpdate, isFromNativeInsertStmt, isIgnoreMode,
Optional.empty(), Optional.empty(), child);
}

/**
* constructor
*/
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, boolean isFromNativeInsertStmt,
List<String> partitions, boolean isPartialUpdate, boolean isFromNativeInsertStmt, boolean isIgnoreMode,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, child);
Expand All @@ -80,6 +88,7 @@ public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<
this.partitions = Utils.copyRequiredList(partitions);
this.isPartialUpdate = isPartialUpdate;
this.isFromNativeInsertStmt = isFromNativeInsertStmt;
this.isIgnoreMode = isIgnoreMode;
}

public List<String> getColNames() {
Expand All @@ -106,11 +115,16 @@ public boolean isFromNativeInsertStmt() {
return isFromNativeInsertStmt;
}

public boolean isIgnoreMode() {
return isIgnoreMode;
}

@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child");
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, isPartialUpdate,
isFromNativeInsertStmt, groupExpression, Optional.of(getLogicalProperties()), children.get(0));
isFromNativeInsertStmt, isIgnoreMode, groupExpression, Optional.of(getLogicalProperties()),
children.get(0));
}

@Override
Expand Down Expand Up @@ -146,14 +160,15 @@ public int hashCode() {
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, isPartialUpdate,
isFromNativeInsertStmt, groupExpression, Optional.of(getLogicalProperties()), child());
isFromNativeInsertStmt, isIgnoreMode, groupExpression, Optional.of(getLogicalProperties()), child());
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions,
isPartialUpdate, isFromNativeInsertStmt, groupExpression, logicalProperties, children.get(0));
isPartialUpdate, isFromNativeInsertStmt, isIgnoreMode, groupExpression,
logicalProperties, children.get(0));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ public LogicalPlan visitStatementDefault(StatementDefaultContext ctx) {
@Override
public LogicalPlan visitInsertIntoQuery(InsertIntoQueryContext ctx) {
boolean isOverwrite = ctx.INTO() == null;
boolean isIgnoreMode = ctx.IGNORE() != null;
List<String> tableName = visitMultipartIdentifier(ctx.tableName);
String labelName = ctx.labelName == null ? null : ctx.labelName.getText();
List<String> colNames = ctx.cols == null ? ImmutableList.of() : visitIdentifierList(ctx.cols);
Expand All @@ -413,6 +414,7 @@ public LogicalPlan visitInsertIntoQuery(InsertIntoQueryContext ctx) {
partitions,
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(),
true,
isIgnoreMode,
visitQuery(ctx.query()));
if (ctx.explain() != null) {
return withExplain(sink, ctx.explain());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
Expand Down Expand Up @@ -75,6 +76,20 @@ public List<Rule> buildRules() {

LogicalPlan child = ((LogicalPlan) sink.child());

if (sink.isIgnoreMode()) {
if (table.getKeysType() != KeysType.UNIQUE_KEYS
|| !table.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("ignore mode can only be enabled if the target"
+ " table is a unique table with merge-on-write enabled.");
} else if (sink.isPartialUpdate()) {
throw new AnalysisException("ignore mode can't be used in partial update.");
} else if (table.hasSequenceCol()) {
throw new AnalysisException("ignore mode can't be used if the target table has"
+ " sequence column, but table[" + table.getName()
+ "] has sequnce column.");
}
}

LogicalOlapTableSink<?> boundSink = new LogicalOlapTableSink<>(
database,
table,
Expand All @@ -85,6 +100,7 @@ public List<Rule> buildRules() {
.collect(ImmutableList.toImmutableList()),
sink.isPartialUpdate(),
sink.isFromNativeInsertStmt(),
sink.isIgnoreMode(),
sink.child());

// we need to insert all the columns of the target table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public Rule build() {
ctx.connectContext.getSessionVariable().isEnableSingleReplicaInsert(),
sink.isPartialUpdate(),
sink.isFromNativeInsertStmt(),
sink.isIgnoreMode(),
Optional.empty(),
sink.getLogicalProperties(),
sink.child());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
ctx.getSessionVariable().getSendBatchParallelism(),
false,
isStrictMode,
false);
physicalOlapTableSink.isIgnoreMode());

sink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,28 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
private final List<Long> partitionIds;
private final boolean isPartialUpdate;
private final boolean isFromNativeInsertStmt;
private final boolean isIgnoreMode;

public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols, List<Long> partitionIds,
List<NamedExpression> outputExprs, boolean isPartialUpdate, boolean isFromNativeInsertStmt,
CHILD_TYPE child) {
this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, isFromNativeInsertStmt,
this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, isFromNativeInsertStmt, false,
Optional.empty(), Optional.empty(), child);
}

public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols, List<Long> partitionIds,
List<NamedExpression> outputExprs, boolean isPartialUpdate, boolean isFromNativeInsertStmt,
boolean isIgnoreMode, CHILD_TYPE child) {
this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, isFromNativeInsertStmt,
isIgnoreMode, Optional.empty(), Optional.empty(), child);
}

/**
* constructor
*/
public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols,
List<Long> partitionIds, List<NamedExpression> outputExprs, boolean isPartialUpdate,
boolean isFromNativeInsertStmt, Optional<GroupExpression> groupExpression,
boolean isFromNativeInsertStmt, boolean isIgnoreMode, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child);
this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink");
Expand All @@ -69,21 +77,22 @@ public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Colum
this.isPartialUpdate = isPartialUpdate;
this.isFromNativeInsertStmt = isFromNativeInsertStmt;
this.partitionIds = Utils.copyRequiredList(partitionIds);
this.isIgnoreMode = isIgnoreMode;
}

public Plan withChildAndUpdateOutput(Plan child) {
List<NamedExpression> output = child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList());
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, output, isPartialUpdate,
isFromNativeInsertStmt, Optional.empty(), Optional.empty(), child);
isFromNativeInsertStmt, isIgnoreMode, Optional.empty(), Optional.empty(), child);
}

@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child");
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate,
isFromNativeInsertStmt, Optional.empty(), Optional.empty(), children.get(0));
isFromNativeInsertStmt, isIgnoreMode, Optional.empty(), Optional.empty(), children.get(0));
}

public Database getDatabase() {
Expand All @@ -110,6 +119,10 @@ public boolean isFromNativeInsertStmt() {
return isFromNativeInsertStmt;
}

public boolean isIgnoreMode() {
return isIgnoreMode;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -123,6 +136,7 @@ public boolean equals(Object o) {
}
LogicalOlapTableSink<?> that = (LogicalOlapTableSink<?>) o;
return isPartialUpdate == that.isPartialUpdate && isFromNativeInsertStmt == that.isFromNativeInsertStmt
&& isIgnoreMode == that.isIgnoreMode
&& Objects.equals(database, that.database)
&& Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols)
&& Objects.equals(partitionIds, that.partitionIds);
Expand All @@ -131,7 +145,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds,
isPartialUpdate, isFromNativeInsertStmt);
isPartialUpdate, isFromNativeInsertStmt, isIgnoreMode);
}

@Override
Expand All @@ -143,7 +157,8 @@ public String toString() {
"cols", cols,
"partitionIds", partitionIds,
"isPartialUpdate", isPartialUpdate,
"isFromNativeInsertStmt", isFromNativeInsertStmt
"isFromNativeInsertStmt", isFromNativeInsertStmt,
"isIgnoreMode", isIgnoreMode
);
}

Expand All @@ -155,13 +170,13 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate,
isFromNativeInsertStmt, groupExpression, Optional.of(getLogicalProperties()), child());
isFromNativeInsertStmt, isIgnoreMode, groupExpression, Optional.of(getLogicalProperties()), child());
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate,
isFromNativeInsertStmt, groupExpression, logicalProperties, children.get(0));
isFromNativeInsertStmt, isIgnoreMode, groupExpression, logicalProperties, children.get(0));
}
}
Loading