Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.analyzer;

import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Optional;

/**
* Represent an jdbc table sink plan node that has not been bound.
*/
public class UnboundJdbcTableSink<CHILD_TYPE extends Plan> extends UnboundBaseExternalTableSink<CHILD_TYPE> {

public UnboundJdbcTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, DMLCommandType.NONE,
Optional.empty(), Optional.empty(), child);
}

/**
* constructor
*/
public UnboundJdbcTableSink(List<String> nameParts,
List<String> colNames,
List<String> hints,
List<String> partitions,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(nameParts, PlanType.LOGICAL_UNBOUND_JDBC_TABLE_SINK, ImmutableList.of(), groupExpression,
logicalProperties, colNames, dmlCommandType, child, hints, partitions);
}

@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"UnboundJdbcTableSink should have exactly one child");
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), children.get(0));
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitUnboundJdbcTableSink(this, context);
}

@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.trees.plans.Plan;
Expand Down Expand Up @@ -78,6 +79,9 @@ public static LogicalSink<? extends Plan> createUnboundTableSink(List<String> na
} else if (curCatalog instanceof IcebergExternalCatalog) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof JdbcExternalCatalog) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported.");
}
Expand Down Expand Up @@ -109,20 +113,16 @@ public static LogicalSink<? extends Plan> createUnboundTableSinkMaybeOverwrite(L
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
// TODO: we need to support insert into other catalog
try {
if (ConnectContext.get() != null) {
ConnectContext.get().getSessionVariable().enableFallbackToOriginalPlannerOnce();
}
} catch (Exception e) {
// ignore this.
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof JdbcExternalCatalog) {
return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}

throw new AnalysisException(
(isOverwrite ? "insert overwrite" : "insert") + " data to " + curCatalog.getClass().getSimpleName()
+ " is not supported."
+ (isAutoDetectPartition
? " PARTITION(*) is only supported in overwrite partition for OLAP table" : ""));
? " PARTITION(*) is only supported in overwrite partition for OLAP table" : ""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.datasource.jdbc.sink.JdbcTableSink;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode;
Expand Down Expand Up @@ -129,6 +130,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
Expand Down Expand Up @@ -498,6 +500,24 @@ public PlanFragment visitPhysicalIcebergTableSink(PhysicalIcebergTableSink<? ext
return rootFragment;
}

@Override
public PlanFragment visitPhysicalJdbcTableSink(PhysicalJdbcTableSink<? extends Plan> jdbcTableSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = jdbcTableSink.child().accept(this, context);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
List<Column> targetTableColumns = jdbcTableSink.getCols();
List<String> insertCols = targetTableColumns.stream()
.map(Column::getName)
.collect(Collectors.toList());

JdbcTableSink sink = new JdbcTableSink(
((JdbcExternalTable) jdbcTableSink.getTargetTable()).getJdbcTable(),
insertCols
);
rootFragment.setSink(sink);
return rootFragment;
}

@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
PlanTranslatorContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
Expand Down Expand Up @@ -67,6 +68,13 @@ public Plan visitLogicalIcebergTableSink(
return tableSink;
}

@Override
public Plan visitLogicalJdbcTableSink(
LogicalJdbcTableSink<? extends Plan> tableSink, StatementContext context) {
turnOffPageCache(context);
return tableSink;
}

private void turnOffPageCache(StatementContext context) {
SessionVariable sessionVariable = context.getConnectContext().getSessionVariable();
// set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
Expand Down Expand Up @@ -152,6 +153,14 @@ public Void visitPhysicalIcebergTableSink(
return null;
}

@Override
public Void visitPhysicalJdbcTableSink(
PhysicalJdbcTableSink<? extends Plan> jdbcTableSink, PlanContext context) {
// Always use gather properties for jdbcTableSink
addRequestPropertyToChildren(PhysicalProperties.GATHER);
return null;
}

@Override
public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) {
if (context.getSessionVariable().enableParallelResultSink()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
import org.apache.doris.nereids.rules.implementation.LogicalJdbcTableSinkToPhysicalJdbcTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToNestedLoopJoin;
import org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit;
Expand Down Expand Up @@ -199,6 +200,7 @@ public class RuleSet {
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
.add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink())
.add(new LogicalJdbcTableSinkToPhysicalJdbcTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public enum RuleType {
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_JDBC_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_FILE_SINK(RuleTypeClass.REWRITE),
INIT_MATERIALIZATION_HOOK_FOR_TABLE_SINK(RuleTypeClass.REWRITE),
Expand Down Expand Up @@ -438,6 +439,7 @@ public enum RuleType {
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_JDBC_TABLE_SINK_TO_PHYSICAL_JDBC_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
Expand All @@ -58,6 +61,7 @@
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
Expand Down Expand Up @@ -109,7 +113,8 @@ public List<Rule> buildRules() {
// TODO: bind hive taget table
RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)),
RuleType.BINDING_INSERT_ICEBERG_TABLE.build(
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink))
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)),
RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink))
);
}

Expand Down Expand Up @@ -524,6 +529,64 @@ private Plan bindIcebergTableSink(MatchingContext<UnboundIcebergTableSink<Plan>>
return boundSink.withChildAndUpdateOutput(fullOutputProject);
}

private Plan bindJdbcTableSink(MatchingContext<UnboundJdbcTableSink<Plan>> ctx) {
UnboundJdbcTableSink<?> sink = ctx.root;
Pair<JdbcExternalDatabase, JdbcExternalTable> pair = bind(ctx.cascadesContext, sink);
JdbcExternalDatabase database = pair.first;
JdbcExternalTable table = pair.second;
LogicalPlan child = ((LogicalPlan) sink.child());

List<Column> bindColumns;
if (sink.getColNames().isEmpty()) {
bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
} else {
bindColumns = sink.getColNames().stream().map(cn -> {
Column column = table.getColumn(cn);
if (column == null) {
throw new AnalysisException(String.format("column %s is not found in table %s",
cn, table.getName()));
}
return column;
}).collect(ImmutableList.toImmutableList());
}
LogicalJdbcTableSink<?> boundSink = new LogicalJdbcTableSink<>(
database,
table,
bindColumns,
child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()),
sink.getDMLCommandType(),
Optional.empty(),
Optional.empty(),
child);
// we need to insert all the columns of the target table
if (boundSink.getCols().size() != child.getOutput().size()) {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
Map<String, NamedExpression> columnToOutput = getJdbcColumnToOutput(bindColumns, child);
// We don't need to insert unmentioned columns, only user specified columns
LogicalProject<?> outputProject = getOutputProjectByCoercion(bindColumns, child, columnToOutput);
return boundSink.withChildAndUpdateOutput(outputProject);
}

private static Map<String, NamedExpression> getJdbcColumnToOutput(
List<Column> bindColumns, LogicalPlan child) {
Map<String, NamedExpression> columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);

for (int i = 0; i < bindColumns.size(); i++) {
Column column = bindColumns.get(i);
NamedExpression outputExpr = child.getOutput().get(i);
Alias output = new Alias(
TypeCoercionUtils.castIfNotSameType(outputExpr, DataType.fromCatalogType(column.getType())),
column.getName()
);
columnToOutput.put(column.getName(), output);
}

return columnToOutput;
}

private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, UnboundTableSink<? extends Plan> sink) {
List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
sink.getNameParts());
Expand Down Expand Up @@ -567,6 +630,18 @@ private Pair<IcebergExternalDatabase, IcebergExternalTable> bind(CascadesContext
throw new AnalysisException("the target table of insert into is not an iceberg table");
}

private Pair<JdbcExternalDatabase, JdbcExternalTable> bind(CascadesContext cascadesContext,
UnboundJdbcTableSink<? extends Plan> sink) {
List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
sink.getNameParts());
Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
cascadesContext.getConnectContext().getEnv());
if (pair.second instanceof JdbcExternalTable) {
return Pair.of(((JdbcExternalDatabase) pair.first), (JdbcExternalTable) pair.second);
}
throw new AnalysisException("the target table of insert into is not an jdbc table");
}

private List<Long> bindPartitionIds(OlapTable table, List<String> partitions, boolean temp) {
return partitions.isEmpty()
? ImmutableList.of()
Expand Down
Loading