diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 49ca6721f498be..6f79afd5de5d7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -22,6 +22,7 @@ import org.apache.doris.datasource.SessionContext; import org.apache.doris.datasource.operations.ExternalMetadataOperations; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.transaction.TransactionManagerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.Constants; @@ -51,7 +52,9 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) { @Override protected void initLocalObjectsImpl() { initCatalog(); - metadataOps = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog); + IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog); + transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops); + metadataOps = ops; } public Catalog getCatalog() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index de9c3814fd68b9..06864bfe6a6e9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -28,9 +28,14 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Table; + import java.util.HashMap; import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; public class IcebergExternalTable extends ExternalTable { @@ -83,4 +88,15 @@ public long fetchRowCount() { makeSureInitialized(); return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); } + + public Table getIcebergTable() { + return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); + } + + @Override + public Set getPartitionNames() { + getIcebergTable(); + return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()) + .spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java new file mode 100644 index 00000000000000..2e1ad8ab1e1a2e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +// and modified by Doris + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TFileContent; +import org.apache.doris.thrift.TIcebergCommitData; +import org.apache.doris.transaction.Transaction; + +import com.google.common.base.VerifyException; +import com.google.common.collect.Lists; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class IcebergTransaction implements Transaction { + + private static final Logger LOG = LogManager.getLogger(IcebergTransaction.class); + private final IcebergMetadataOps ops; + private org.apache.iceberg.Transaction transaction; + private final List commitDataList = Lists.newArrayList(); + + public IcebergTransaction(IcebergMetadataOps ops) { + this.ops = ops; + } + + public void updateIcebergCommitData(List commitDataList) { + synchronized (this) { + this.commitDataList.addAll(commitDataList); + } + } + + public void beginInsert(String dbName, String tbName) { + Table icebergTable = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName)); + transaction = icebergTable.newTransaction(); + } + + public void finishInsert() { + Table icebergTable = transaction.table(); + AppendFiles appendFiles = transaction.newAppend(); + + for (CommitTaskData task : convertToCommitTaskData()) { + DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) + .withPath(task.getPath()) + .withFileSizeInBytes(task.getFileSizeInBytes()) + .withFormat(IcebergUtils.getFileFormat(icebergTable)) + .withMetrics(task.getMetrics()); + + if (icebergTable.spec().isPartitioned()) { + List partitionValues = task.getPartitionValues() + .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + builder.withPartitionValues(partitionValues); + } + appendFiles.appendFile(builder.build()); + } + + // in appendFiles.commit, it will generate metadata(manifest and snapshot) + // after appendFiles.commit, in current transaction, you can already see the new snapshot + appendFiles.commit(); + } + + public List convertToCommitTaskData() { + List commitTaskData = new ArrayList<>(); + for (TIcebergCommitData data : this.commitDataList) { + commitTaskData.add(new CommitTaskData( + data.getFilePath(), + data.getFileSize(), + new Metrics( + data.getRowCount(), + Collections.EMPTY_MAP, + Collections.EMPTY_MAP, + Collections.EMPTY_MAP, + Collections.EMPTY_MAP + ), + data.isSetPartitionValues() ? Optional.of(data.getPartitionValues()) : Optional.empty(), + convertToFileContent(data.getFileContent()), + data.isSetReferencedDataFiles() ? Optional.of(data.getReferencedDataFiles()) : Optional.empty() + )); + } + return commitTaskData; + } + + private FileContent convertToFileContent(TFileContent content) { + if (content.equals(TFileContent.DATA)) { + return FileContent.DATA; + } else if (content.equals(TFileContent.POSITION_DELETES)) { + return FileContent.POSITION_DELETES; + } else { + return FileContent.EQUALITY_DELETES; + } + } + + @Override + public void commit() throws UserException { + // Externally readable + // Manipulate the relevant data so that others can also see the latest table, such as: + // 1. hadoop: it will change the version number information in 'version-hint.text' + // 2. hive: it will change the table properties, the most important thing is to revise 'metadata_location' + // 3. and so on ... + transaction.commitTransaction(); + } + + @Override + public void rollback() { + + } + + public long getUpdateCnt() { + return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum(); + } + + public static class CommitTaskData { + private final String path; + private final long fileSizeInBytes; + private final Metrics metrics; + private final Optional> partitionValues; + private final FileContent content; + private final Optional> referencedDataFiles; + + public CommitTaskData(String path, + long fileSizeInBytes, + Metrics metrics, + Optional> partitionValues, + FileContent content, + Optional> referencedDataFiles) { + this.path = path; + this.fileSizeInBytes = fileSizeInBytes; + this.metrics = metrics; + this.partitionValues = partitionValues; + this.content = content; + this.referencedDataFiles = referencedDataFiles; + } + + public String getPath() { + return path; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public Metrics getMetrics() { + return metrics; + } + + public Optional> getPartitionValues() { + return partitionValues; + } + + public FileContent getContent() { + return content; + } + + public Optional> getReferencedDataFiles() { + return referencedDataFiles; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 05519f84595706..9c57fc8e940335 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -46,6 +46,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; +import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.thrift.TExprOpcode; import com.google.common.collect.Lists; @@ -53,6 +54,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.expressions.And; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -61,6 +63,7 @@ import org.apache.iceberg.expressions.Unbound; import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.LocationUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -90,6 +93,13 @@ public Integer initialValue() { public static final String TOTAL_POSITION_DELETES = "total-position-deletes"; public static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes"; + // nickname in flink and spark + public static final String WRITE_FORMAT = "write-format"; + public static final String COMPRESSION_CODEC = "compression-codec"; + + // nickname in spark + public static final String SPARK_SQL_COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; + public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { return null; @@ -573,4 +583,51 @@ public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, St } return -1; } + + public static String getFileFormat(Table table) { + Map properties = table.properties(); + if (properties.containsKey(WRITE_FORMAT)) { + return properties.get(WRITE_FORMAT); + } + if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) { + return properties.get(TableProperties.DEFAULT_FILE_FORMAT); + } + return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + } + + public static String getFileCompress(Table table) { + Map properties = table.properties(); + if (properties.containsKey(COMPRESSION_CODEC)) { + return properties.get(COMPRESSION_CODEC); + } else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) { + return properties.get(SPARK_SQL_COMPRESSION_CODEC); + } + String fileFormat = getFileFormat(table); + if (fileFormat.equalsIgnoreCase("parquet")) { + return properties.getOrDefault( + TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); + } else if (fileFormat.equalsIgnoreCase("orc")) { + return properties.getOrDefault( + TableProperties.ORC_COMPRESSION, TableProperties.ORC_COMPRESSION_DEFAULT); + } + throw new NotSupportedException("Unsupported file format: " + fileFormat); + } + + public static String dataLocation(Table table) { + Map properties = table.properties(); + if (properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) { + throw new NotSupportedException( + "Table " + table.name() + " specifies " + properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL) + + " as a location provider. " + + "Writing to Iceberg tables with custom location provider is not supported."); + } + String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION); + if (dataLocation == null) { + dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + if (dataLocation == null) { + dataLocation = String.format("%s/data", LocationUtil.stripTrailingSlash(table.location())); + } + } + return dataLocation; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java index 42e0d709e05757..e590e918344d7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java @@ -24,11 +24,11 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.planner.ColumnRange; import org.apache.doris.thrift.TFileAttributes; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import java.util.Map; @@ -61,14 +61,7 @@ public TupleDescriptor getDesc() { @Override public String getFileFormat() { - Map properties = originTable.properties(); - if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) { - return properties.get(TableProperties.DEFAULT_FILE_FORMAT); - } - if (properties.containsKey(FLINK_WRITE_FORMAT)) { - return properties.get(FLINK_WRITE_FORMAT); - } - return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + return IcebergUtils.getFileFormat(originTable); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java index 632120e5c452ec..06b785a15f890a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java @@ -26,12 +26,11 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.source.HiveScanNode; +import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.planner.ColumnRange; import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileTextScanRangeParams; -import org.apache.iceberg.TableProperties; - import java.util.Map; public class IcebergHMSSource implements IcebergSource { @@ -59,14 +58,7 @@ public TupleDescriptor getDesc() { @Override public String getFileFormat() throws DdlException, MetaNotFoundException { - Map properties = hmsTable.getRemoteTable().getParameters(); - if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) { - return properties.get(TableProperties.DEFAULT_FILE_FORMAT); - } - if (properties.containsKey(FLINK_WRITE_FORMAT)) { - return properties.get(FLINK_WRITE_FORMAT); - } - return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + return IcebergUtils.getFileFormat(icebergTable); } public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java index 270a4d4df18f61..b4b1bf2a805d12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java @@ -27,9 +27,6 @@ public interface IcebergSource { - // compatible with flink, which is "write.format.default" in spark - String FLINK_WRITE_FORMAT = "write-format"; - TupleDescriptor getDesc(); org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundBaseExternalTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundBaseExternalTableSink.java new file mode 100644 index 00000000000000..cfdefc59872d4e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundBaseExternalTableSink.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.nereids.exceptions.UnboundException; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.UnboundLogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; +import org.apache.doris.nereids.util.Utils; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Represent an external table sink plan node that has not been bound. + */ +public abstract class UnboundBaseExternalTableSink extends UnboundLogicalSink + implements Unbound, Sink, BlockFuncDepsPropagation { + List hints; + List partitions; + + /** + * constructor + */ + public UnboundBaseExternalTableSink(List nameParts, + PlanType type, + List outputExprs, + Optional groupExpression, + Optional logicalProperties, + List colNames, + DMLCommandType dmlCommandType, + CHILD_TYPE child, + List hints, + List partitions) { + super(nameParts, type, outputExprs, groupExpression, + logicalProperties, colNames, dmlCommandType, child); + this.hints = Utils.copyRequiredList(hints); + this.partitions = Utils.copyRequiredList(partitions); + } + + public List getColNames() { + return colNames; + } + + public List getPartitions() { + return partitions; + } + + public List getHints() { + return hints; + } + + @Override + public UnboundBaseExternalTableSink withOutputExprs(List outputExprs) { + throw new UnboundException("could not call withOutputExprs on " + this.getClass().getSimpleName()); + } + + @Override + public List getExpressions() { + throw new UnsupportedOperationException(this.getClass().getSimpleName() + " don't support getExpression()"); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UnboundBaseExternalTableSink that = (UnboundBaseExternalTableSink) o; + return Objects.equals(nameParts, that.nameParts) + && Objects.equals(colNames, that.colNames) + && Objects.equals(hints, that.hints) + && Objects.equals(partitions, that.partitions); + } + + @Override + public int hashCode() { + return Objects.hash(nameParts, colNames, hints, partitions); + } + + @Override + public LogicalProperties computeLogicalProperties() { + return UnboundLogicalProperties.INSTANCE; + } + + @Override + public List computeOutput() { + throw new UnboundException("output"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java index 0b56c2b681d087..4ffbc0230a005e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java @@ -17,36 +17,23 @@ package org.apache.doris.nereids.analyzer; -import org.apache.doris.nereids.exceptions.UnboundException; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; -import org.apache.doris.nereids.properties.UnboundLogicalProperties; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.Slot; -import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; -import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; -import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; -import org.apache.doris.nereids.util.Utils; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.List; -import java.util.Objects; import java.util.Optional; /** - * Represent an hive table sink plan node that has not been bound. + * Represent a hive table sink plan node that has not been bound. */ -public class UnboundHiveTableSink extends UnboundLogicalSink - implements Unbound, Sink, BlockFuncDepsPropagation { - private final List hints; - private final List partitions; +public class UnboundHiveTableSink extends UnboundBaseExternalTableSink { public UnboundHiveTableSink(List nameParts, List colNames, List hints, List partitions, CHILD_TYPE child) { @@ -57,28 +44,21 @@ public UnboundHiveTableSink(List nameParts, List colNames, List< /** * constructor */ - public UnboundHiveTableSink(List nameParts, List colNames, List hints, + public UnboundHiveTableSink(List nameParts, + List colNames, + List hints, List partitions, DMLCommandType dmlCommandType, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { super(nameParts, PlanType.LOGICAL_UNBOUND_HIVE_TABLE_SINK, ImmutableList.of(), groupExpression, - logicalProperties, colNames, dmlCommandType, child); - this.hints = Utils.copyRequiredList(hints); - this.partitions = Utils.copyRequiredList(partitions); - } - - public List getColNames() { - return colNames; - } - - public List getPartitions() { - return partitions; + logicalProperties, colNames, dmlCommandType, child, hints, partitions); } - public List getHints() { - return hints; + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitUnboundHiveTableSink(this, context); } @Override @@ -86,64 +66,19 @@ public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "UnboundHiveTableSink only accepts one child"); return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, - dmlCommandType, groupExpression, Optional.empty(), children.get(0)); - } - - @Override - public UnboundHiveTableSink withOutputExprs(List outputExprs) { - throw new UnboundException("could not call withOutputExprs on UnboundHiveTableSink"); - } - - @Override - public R accept(PlanVisitor visitor, C context) { - return visitor.visitUnboundHiveTableSink(this, context); - } - - @Override - public List getExpressions() { - throw new UnsupportedOperationException(this.getClass().getSimpleName() + " don't support getExpression()"); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - UnboundHiveTableSink that = (UnboundHiveTableSink) o; - return Objects.equals(nameParts, that.nameParts) - && Objects.equals(colNames, that.colNames) - && Objects.equals(hints, that.hints) - && Objects.equals(partitions, that.partitions); - } - - @Override - public int hashCode() { - return Objects.hash(nameParts, colNames, hints, partitions); + dmlCommandType, groupExpression, Optional.empty(), children.get(0)); } @Override public Plan withGroupExpression(Optional groupExpression) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, - dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, - Optional logicalProperties, List children) { + Optional logicalProperties, List children) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, - dmlCommandType, groupExpression, logicalProperties, children.get(0)); - } - - @Override - public LogicalProperties computeLogicalProperties() { - return UnboundLogicalProperties.INSTANCE; - } - - @Override - public List computeOutput() { - throw new UnboundException("output"); + dmlCommandType, groupExpression, logicalProperties, children.get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java new file mode 100644 index 00000000000000..a540e3a9067d53 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundIcebergTableSink.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Optional; + +/** + * Represent an iceberg table sink plan node that has not been bound. + */ +public class UnboundIcebergTableSink extends UnboundBaseExternalTableSink { + + public UnboundIcebergTableSink(List nameParts, List colNames, List hints, + List partitions, CHILD_TYPE child) { + this(nameParts, colNames, hints, partitions, DMLCommandType.NONE, + Optional.empty(), Optional.empty(), child); + } + + /** + * constructor + */ + public UnboundIcebergTableSink(List nameParts, + List colNames, + List hints, + List partitions, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(nameParts, PlanType.LOGICAL_UNBOUND_HIVE_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child, hints, partitions); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, + "UnboundHiveTableSink only accepts one child"); + return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.empty(), children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitUnboundIcebergTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java index 974bec90a2cdcc..fd70401f25d3c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -22,6 +22,7 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.exceptions.ParseException; import org.apache.doris.nereids.trees.plans.Plan; @@ -53,6 +54,8 @@ public static LogicalSink createUnboundTableSink(List na return new UnboundTableSink<>(nameParts, colNames, hints, partitions, query); } else if (curCatalog instanceof HMSExternalCatalog) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, query); + } else if (curCatalog instanceof IcebergExternalCatalog) { + return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, query); } throw new UserException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); } @@ -72,6 +75,9 @@ public static LogicalSink createUnboundTableSink(List na } else if (curCatalog instanceof HMSExternalCatalog) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof IcebergExternalCatalog) { + return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); } @@ -101,6 +107,9 @@ public static LogicalSink createUnboundTableSinkMaybeOverwrite(L } else if (curCatalog instanceof HMSExternalCatalog && !isAutoDetectPartition) { return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, dmlCommandType, Optional.empty(), Optional.empty(), plan); + } else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) { + return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); } throw new AnalysisException( "Auto overwrite data to " + curCatalog.getClass().getSimpleName() + " is not supported." diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 6748e8df249349..d3bf9e4d737b65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -121,6 +121,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect; import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; @@ -166,6 +167,7 @@ import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.HashJoinNode.DistributionMode; import org.apache.doris.planner.HiveTableSink; +import org.apache.doris.planner.IcebergTableSink; import org.apache.doris.planner.IntersectNode; import org.apache.doris.planner.JoinNodeBase; import org.apache.doris.planner.MultiCastDataSink; @@ -455,7 +457,28 @@ public PlanFragment visitPhysicalHiveTableSink(PhysicalHiveTableSink icebergTableSink, + PlanTranslatorContext context) { + PlanFragment rootFragment = icebergTableSink.child().accept(this, context); + rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); + + TupleDescriptor hiveTuple = context.generateTupleDesc(); + List targetTableColumns = icebergTableSink.getTargetTable().getFullSchema(); + for (Column column : targetTableColumns) { + SlotDescriptor slotDesc = context.addSlotDesc(hiveTuple); + slotDesc.setIsMaterialized(true); + slotDesc.setType(column.getType()); + slotDesc.setColumn(column); + slotDesc.setIsNullable(column.isAllowNull()); + slotDesc.setAutoInc(column.isAutoInc()); + } + IcebergTableSink sink = new IcebergTableSink((IcebergExternalTable) icebergTableSink.getTargetTable()); rootFragment.setSink(sink); return rootFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java index 67f0c1c3ba902f..ab817c2f1d7c56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/TurnOffPageCacheForInsertIntoSelect.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.VariableMgr; @@ -59,6 +60,13 @@ public Plan visitLogicalHiveTableSink(LogicalHiveTableSink table return tableSink; } + @Override + public Plan visitLogicalIcebergTableSink( + LogicalIcebergTableSink tableSink, StatementContext context) { + turnOffPageCache(context); + return tableSink; + } + private void turnOffPageCache(StatementContext context) { SessionVariable sessionVariable = context.getConnectContext().getSessionVariable(); // set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 16086f7e295003..750707c52c4814 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; @@ -140,6 +141,17 @@ public Void visitPhysicalHiveTableSink(PhysicalHiveTableSink hiv return null; } + @Override + public Void visitPhysicalIcebergTableSink( + PhysicalIcebergTableSink icebergTableSink, PlanContext context) { + if (connectContext != null && !connectContext.getSessionVariable().enableStrictConsistencyDml) { + addRequestPropertyToChildren(PhysicalProperties.ANY); + } else { + addRequestPropertyToChildren(icebergTableSink.getRequirePhysicalProperties()); + } + return null; + } + @Override public Void visitPhysicalResultSink(PhysicalResultSink physicalResultSink, PlanContext context) { addRequestPropertyToChildren(PhysicalProperties.GATHER); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index d317b1e8738521..1525172d5afcf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -66,6 +66,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter; import org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate; import org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink; +import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink; import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect; import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan; import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin; @@ -191,6 +192,7 @@ public class RuleSet { .add(new LogicalGenerateToPhysicalGenerate()) .add(new LogicalOlapTableSinkToPhysicalOlapTableSink()) .add(new LogicalHiveTableSinkToPhysicalHiveTableSink()) + .add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink()) .add(new LogicalFileSinkToPhysicalFileSink()) .add(new LogicalResultSinkToPhysicalResultSink()) .add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index c4eb7fe9b06348..316355200ee33d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -31,6 +31,7 @@ public enum RuleType { // **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. **** BINDING_RESULT_SINK(RuleTypeClass.REWRITE), BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE), + BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_FILE(RuleTypeClass.REWRITE), BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE), @@ -398,6 +399,7 @@ public enum RuleType { LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index 7a56d2523e0c33..63f6db8fad93e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -29,8 +29,11 @@ import org.apache.doris.common.Pair; import org.apache.doris.datasource.hive.HMSExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.iceberg.IcebergExternalDatabase; +import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; +import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -54,6 +57,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -102,7 +106,9 @@ public List buildRules() { }) ), // TODO: bind hive taget table - RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)) + RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)), + RuleType.BINDING_INSERT_ICEBERG_TABLE.build( + unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)) ); } @@ -393,12 +399,53 @@ private Plan bindHiveTableSink(MatchingContext> ctx) Column column = table.getColumn(cn); if (column == null) { throw new AnalysisException(String.format("column %s is not found in table %s", - cn, table.getName())); + cn, table.getName())); } return column; }).collect(ImmutableList.toImmutableList()); } LogicalHiveTableSink boundSink = new LogicalHiveTableSink<>( + database, + table, + bindColumns, + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + sink.getDMLCommandType(), + Optional.empty(), + Optional.empty(), + child); + // we need to insert all the columns of the target table + if (boundSink.getCols().size() != child.getOutput().size()) { + throw new AnalysisException("insert into cols should be corresponding to the query output"); + } + Map columnToOutput = getColumnToOutput(ctx, table, false, + boundSink, child); + LogicalProject fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput); + return boundSink.withChildAndUpdateOutput(fullOutputProject); + } + + private Plan bindIcebergTableSink(MatchingContext> ctx) { + UnboundIcebergTableSink sink = ctx.root; + Pair pair = bind(ctx.cascadesContext, sink); + IcebergExternalDatabase database = pair.first; + IcebergExternalTable table = pair.second; + LogicalPlan child = ((LogicalPlan) sink.child()); + + List bindColumns; + if (sink.getColNames().isEmpty()) { + bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList()); + } else { + bindColumns = sink.getColNames().stream().map(cn -> { + Column column = table.getColumn(cn); + if (column == null) { + throw new AnalysisException(String.format("column %s is not found in table %s", + cn, table.getName())); + } + return column; + }).collect(ImmutableList.toImmutableList()); + } + LogicalIcebergTableSink boundSink = new LogicalIcebergTableSink<>( database, table, bindColumns, @@ -442,11 +489,26 @@ private Pair bind(CascadesContext cascade Pair, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, cascadesContext.getConnectContext().getEnv()); if (pair.second instanceof HMSExternalTable) { - return Pair.of(((HMSExternalDatabase) pair.first), (HMSExternalTable) pair.second); + HMSExternalTable table = (HMSExternalTable) pair.second; + if (table.getDlaType() == HMSExternalTable.DLAType.HIVE) { + return Pair.of(((HMSExternalDatabase) pair.first), table); + } } throw new AnalysisException("the target table of insert into is not a Hive table"); } + private Pair bind(CascadesContext cascadesContext, + UnboundIcebergTableSink sink) { + List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), + sink.getNameParts()); + Pair, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, + cascadesContext.getConnectContext().getEnv()); + if (pair.second instanceof IcebergExternalTable) { + return Pair.of(((IcebergExternalDatabase) pair.first), (IcebergExternalTable) pair.second); + } + throw new AnalysisException("the target table of insert into is not an iceberg table"); + } + private List bindPartitionIds(OlapTable table, List partitions, boolean temp) { return partitions.isEmpty() ? ImmutableList.of() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalIcebergTableSinkToPhysicalIcebergTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalIcebergTableSinkToPhysicalIcebergTableSink.java new file mode 100644 index 00000000000000..c520ef83f2730c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalIcebergTableSinkToPhysicalIcebergTableSink.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; + +import java.util.Optional; + +/** + * Implementation rule that convert logical IcebergTableSink to physical IcebergTableSink. + */ +public class LogicalIcebergTableSinkToPhysicalIcebergTableSink extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalIcebergTableSink().thenApply(ctx -> { + LogicalIcebergTableSink sink = ctx.root; + return new PhysicalIcebergTableSink<>( + sink.getDatabase(), + sink.getTargetTable(), + sink.getCols(), + sink.getOutputExprs(), + Optional.empty(), + sink.getLogicalProperties(), + null, + null, + sink.child()); + }).toRule(RuleType.LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 4aeecdfd507799..71a7c24c9b64c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -48,6 +48,7 @@ public enum PlanType { LOGICAL_FILE_SINK, LOGICAL_OLAP_TABLE_SINK, LOGICAL_HIVE_TABLE_SINK, + LOGICAL_ICEBERG_TABLE_SINK, LOGICAL_RESULT_SINK, LOGICAL_UNBOUND_OLAP_TABLE_SINK, LOGICAL_UNBOUND_HIVE_TABLE_SINK, @@ -100,6 +101,7 @@ public enum PlanType { PHYSICAL_FILE_SINK, PHYSICAL_OLAP_TABLE_SINK, PHYSICAL_HIVE_TABLE_SINK, + PHYSICAL_ICEBERG_TABLE_SINK, PHYSICAL_RESULT_SINK, // physical others diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertCommandContext.java new file mode 100644 index 00000000000000..eb2b45e767c346 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertCommandContext.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +/** + * For Base External Table + */ +public class BaseExternalTableInsertCommandContext extends InsertCommandContext { + protected boolean overwrite = false; + + public boolean isOverwrite() { + return overwrite; + } + + public void setOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java new file mode 100644 index 00000000000000..5a660826601492 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.planner.BaseExternalTableDataSink; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.transaction.TransactionManager; +import org.apache.doris.transaction.TransactionStatus; +import org.apache.doris.transaction.TransactionType; + +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Insert executor for base external table + */ +public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExecutor { + private static final Logger LOG = LogManager.getLogger(BaseExternalTableInsertExecutor.class); + private static final long INVALID_TXN_ID = -1L; + protected long txnId = INVALID_TXN_ID; + protected TransactionStatus txnStatus = TransactionStatus.ABORTED; + protected final TransactionManager transactionManager; + protected final String catalogName; + protected Optional summaryProfile = Optional.empty(); + + /** + * constructor + */ + public BaseExternalTableInsertExecutor(ConnectContext ctx, ExternalTable table, + String labelName, NereidsPlanner planner, + Optional insertCtx) { + super(ctx, table, labelName, planner, insertCtx); + catalogName = table.getCatalog().getName(); + transactionManager = table.getCatalog().getTransactionManager(); + + if (ConnectContext.get().getExecutor() != null) { + summaryProfile = Optional.of(ConnectContext.get().getExecutor().getSummaryProfile()); + } + } + + public long getTxnId() { + return txnId; + } + + /** + * collect commit infos from BEs + */ + protected abstract void setCollectCommitInfoFunc(); + + /** + * At this time, FE has successfully collected all commit information from BEs. + * Before commit this txn, commit information need to be analyzed and processed. + */ + protected abstract void doBeforeCommit() throws UserException; + + /** + * The type of the current transaction + */ + protected abstract TransactionType transactionType(); + + @Override + public void beginTransaction() { + txnId = transactionManager.begin(); + setCollectCommitInfoFunc(); + } + + @Override + protected void onComplete() throws UserException { + if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) { + LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier()); + } else { + doBeforeCommit(); + summaryProfile.ifPresent(profile -> profile.setTransactionBeginTime(transactionType())); + transactionManager.commit(txnId); + summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime); + txnStatus = TransactionStatus.COMMITTED; + Env.getCurrentEnv().getRefreshManager().refreshTable( + catalogName, + table.getDatabase().getFullName(), + table.getName(), + true); + } + } + + @Override + protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { + try { + ((BaseExternalTableDataSink) sink).bindDataSink(insertCtx); + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e); + } + } + + @Override + protected void onFail(Throwable t) { + errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage(); + String queryId = DebugUtil.printId(ctx.queryId()); + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t); + StringBuilder sb = new StringBuilder(t.getMessage()); + if (txnId != INVALID_TXN_ID) { + LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId); + if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) { + sb.append(". url: ").append(coordinator.getTrackingUrl()); + } + } + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage()); + transactionManager.rollback(txnId); + } + + @Override + protected void afterExec(StmtExecutor executor) { + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append("'status':'") + .append(ctx.isTxnModel() ? TransactionStatus.PREPARE.name() : txnStatus.name()); + sb.append("', 'txnId':'").append(txnId).append("'"); + if (!Strings.isNullOrEmpty(errMsg)) { + sb.append(", 'err':'").append(errMsg).append("'"); + } + sb.append("}"); + ctx.getState().setOk(loadedRows, 0, sb.toString()); + // set insert result in connection context, + // so that user can use `show insert result` to get info of the last insert operation. + ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(), + txnStatus, loadedRows, 0); + // update it, so that user can get loaded rows in fe.audit.log + ctx.updateReturnRows((int) loadedRows); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java index 49d5a12c2bb7a0..1e68a5cd220733 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java @@ -20,19 +20,10 @@ /** * For Hive Table */ -public class HiveInsertCommandContext extends InsertCommandContext { - private boolean overwrite = false; +public class HiveInsertCommandContext extends BaseExternalTableInsertCommandContext { private String writePath; private String queryId; - public boolean isOverwrite() { - return overwrite; - } - - public void setOverwrite(boolean overwrite) { - this.overwrite = overwrite; - } - public String getWritePath() { return writePath; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index 9421af79503320..dea731f9af5baf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -17,47 +17,26 @@ package org.apache.doris.nereids.trees.plans.commands.insert; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.ErrorCode; import org.apache.doris.common.UserException; -import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HMSTransaction; import org.apache.doris.nereids.NereidsPlanner; -import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; -import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; -import org.apache.doris.planner.DataSink; -import org.apache.doris.planner.HiveTableSink; -import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.QueryState; -import org.apache.doris.qe.StmtExecutor; import org.apache.doris.thrift.TUniqueId; -import org.apache.doris.transaction.TransactionManager; -import org.apache.doris.transaction.TransactionStatus; import org.apache.doris.transaction.TransactionType; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Optional; /** - * Insert executor for olap table + * Insert executor for hive table */ -public class HiveInsertExecutor extends AbstractInsertExecutor { +public class HiveInsertExecutor extends BaseExternalTableInsertExecutor { private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class); - private static final long INVALID_TXN_ID = -1L; - private long txnId = INVALID_TXN_ID; - private TransactionStatus txnStatus = TransactionStatus.ABORTED; - private final TransactionManager transactionManager; - private final String catalogName; - private Optional summaryProfile = Optional.empty(); /** * constructor @@ -66,36 +45,14 @@ public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table, String labelName, NereidsPlanner planner, Optional insertCtx) { super(ctx, table, labelName, planner, insertCtx); - catalogName = table.getCatalog().getName(); - transactionManager = table.getCatalog().getTransactionManager(); - - if (ConnectContext.get().getExecutor() != null) { - summaryProfile = Optional.of(ConnectContext.get().getExecutor().getSummaryProfile()); - } - } - - public long getTxnId() { - return txnId; } @Override - public void beginTransaction() { - txnId = transactionManager.begin(); + public void setCollectCommitInfoFunc() { HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId); coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates); } - @Override - protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { - HiveTableSink hiveTableSink = (HiveTableSink) sink; - PhysicalHiveTableSink physicalHiveSink = (PhysicalHiveTableSink) physicalSink; - try { - hiveTableSink.bindDataSink(physicalHiveSink.getCols(), insertCtx); - } catch (Exception e) { - throw new AnalysisException(e.getMessage(), e); - } - } - @Override protected void beforeExec() { // check params @@ -109,61 +66,16 @@ protected void beforeExec() { } @Override - protected void onComplete() throws UserException { - if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) { - LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier()); - } else { - HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId); - loadedRows = transaction.getUpdateCnt(); - String dbName = ((HMSExternalTable) table).getDbName(); - String tbName = table.getName(); - transaction.finishInsertTable(dbName, tbName); - summaryProfile.ifPresent(profile -> profile.setTransactionBeginTime(TransactionType.HMS)); - transactionManager.commit(txnId); - summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime); - txnStatus = TransactionStatus.COMMITTED; - Env.getCurrentEnv().getRefreshManager().refreshTable( - catalogName, - dbName, - tbName, - true); - } - } - - @Override - protected void onFail(Throwable t) { - errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage(); - String queryId = DebugUtil.printId(ctx.queryId()); - // if any throwable being thrown during insert operation, first we should abort this txn - LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t); - StringBuilder sb = new StringBuilder(t.getMessage()); - if (txnId != INVALID_TXN_ID) { - LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId); - if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) { - sb.append(". url: ").append(coordinator.getTrackingUrl()); - } - } - ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage()); - transactionManager.rollback(txnId); + protected void doBeforeCommit() throws UserException { + HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId); + loadedRows = transaction.getUpdateCnt(); + String dbName = ((HMSExternalTable) table).getDbName(); + String tbName = table.getName(); + transaction.finishInsertTable(dbName, tbName); } @Override - protected void afterExec(StmtExecutor executor) { - StringBuilder sb = new StringBuilder(); - sb.append("{"); - sb.append("'status':'") - .append(ctx.isTxnModel() ? TransactionStatus.PREPARE.name() : txnStatus.name()); - sb.append("', 'txnId':'").append(txnId).append("'"); - if (!Strings.isNullOrEmpty(errMsg)) { - sb.append(", 'err':'").append(errMsg).append("'"); - } - sb.append("}"); - ctx.getState().setOk(loadedRows, 0, sb.toString()); - // set insert result in connection context, - // so that user can use `show insert result` to get info of the last insert operation. - ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(), - txnStatus, loadedRows, 0); - // update it, so that user can get loaded rows in fe.audit.log - ctx.updateReturnRows((int) loadedRows); + protected TransactionType transactionType() { + return TransactionType.HMS; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java new file mode 100644 index 00000000000000..cbb291fb3b718b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.datasource.iceberg.IcebergTransaction; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.transaction.TransactionType; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Insert executor for iceberg table + */ +public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor { + private static final Logger LOG = LogManager.getLogger(IcebergInsertExecutor.class); + + /** + * constructor + */ + public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table, + String labelName, NereidsPlanner planner, + Optional insertCtx) { + super(ctx, table, labelName, planner, insertCtx); + } + + @Override + public void setCollectCommitInfoFunc() { + IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); + coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData); + } + + @Override + protected void doBeforeCommit() throws UserException { + IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); + loadedRows = transaction.getUpdateCnt(); + transaction.finishInsert(); + } + + @Override + protected TransactionType transactionType() { + return TransactionType.ICEBERG; + } + + @Override + protected void beforeExec() { + IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); + transaction.beginInsert(((IcebergExternalTable) table).getDbName(), table.getName()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 7c9ea67860c991..003c2e591165cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -24,6 +24,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; @@ -37,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -177,9 +179,13 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, Optional.of(insertCtx.orElse((new HiveInsertCommandContext())))); // set hive query options + } else if (physicalSink instanceof PhysicalIcebergTableSink) { + IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf; + insertExecutor = new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner, + Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext())))); } else { // TODO: support other table types - throw new AnalysisException("insert into command only support olap table"); + throw new AnalysisException("insert into command only support [olap, hive, iceberg] table"); } insertExecutor.beginTransaction(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index 8293183eeb200e..cabdfc203e5d5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -28,6 +28,7 @@ import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; +import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -369,8 +370,11 @@ public static TableIf getTargetTable(Plan plan, ConnectContext ctx) { unboundTableSink = (UnboundTableSink) plan; } else if (plan instanceof UnboundHiveTableSink) { unboundTableSink = (UnboundHiveTableSink) plan; + } else if (plan instanceof UnboundIcebergTableSink) { + unboundTableSink = (UnboundIcebergTableSink) plan; } else { - throw new AnalysisException("the root of plan should be UnboundTableSink or UnboundHiveTableSink" + throw new AnalysisException("the root of plan should be" + + " [UnboundTableSink, UnboundHiveTableSink, UnboundIcebergTableSink]," + " but it is " + plan.getType()); } List tableQualifier = RelationUtil.getQualifierName(ctx, unboundTableSink.getNameParts()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalIcebergTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalIcebergTableSink.java new file mode 100644 index 00000000000000..d121645b231c03 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalIcebergTableSink.java @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.iceberg.IcebergExternalDatabase; +import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.PropagateFuncDeps; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * logical hive table sink for insert command + */ +public class LogicalIcebergTableSink extends LogicalTableSink + implements Sink, PropagateFuncDeps { + // bound data sink + private final IcebergExternalDatabase database; + private final IcebergExternalTable targetTable; + private final DMLCommandType dmlCommandType; + + /** + * constructor + */ + public LogicalIcebergTableSink(IcebergExternalDatabase database, + IcebergExternalTable targetTable, + List cols, + List outputExprs, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(PlanType.LOGICAL_ICEBERG_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child); + this.database = Objects.requireNonNull(database, "database != null in LogicalHiveTableSink"); + this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalHiveTableSink"); + this.dmlCommandType = dmlCommandType; + } + + public Plan withChildAndUpdateOutput(Plan child) { + List output = child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()); + return new LogicalIcebergTableSink<>(database, targetTable, cols, output, + dmlCommandType, Optional.empty(), Optional.empty(), child); + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, "LogicalHiveTableSink only accepts one child"); + return new LogicalIcebergTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); + } + + public LogicalIcebergTableSink withOutputExprs(List outputExprs) { + return new LogicalIcebergTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, Optional.empty(), Optional.empty(), child()); + } + + public IcebergExternalDatabase getDatabase() { + return database; + } + + public IcebergExternalTable getTargetTable() { + return targetTable; + } + + public DMLCommandType getDmlCommandType() { + return dmlCommandType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + LogicalIcebergTableSink that = (LogicalIcebergTableSink) o; + return dmlCommandType == that.dmlCommandType + && Objects.equals(database, that.database) + && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), database, targetTable, cols, dmlCommandType); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalHiveTableSink[" + id.asInt() + "]", + "outputExprs", outputExprs, + "database", database.getFullName(), + "targetTable", targetTable.getName(), + "cols", cols, + "dmlCommandType", dmlCommandType + ); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitLogicalIcebergTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new LogicalIcebergTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new LogicalIcebergTableSink<>(database, targetTable, cols, outputExprs, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java new file mode 100644 index 00000000000000..82483c63a40412 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalBaseExternalTableSink.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.statistics.Statistics; + +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** abstract physical external table sink */ +public abstract class PhysicalBaseExternalTableSink extends PhysicalTableSink + implements Sink { + + protected final ExternalDatabase database; + protected final ExternalTable targetTable; + protected final List cols; + + /** + * constructor + */ + public PhysicalBaseExternalTableSink(PlanType type, + ExternalDatabase database, + ExternalTable targetTable, + List cols, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, + Statistics statistics, + CHILD_TYPE child) { + super(type, outputExprs, groupExpression, + logicalProperties, physicalProperties, statistics, child); + this.database = Objects.requireNonNull( + database, "database != null in " + this.getClass().getSimpleName()); + this.targetTable = Objects.requireNonNull( + targetTable, "targetTable != null in " + this.getClass().getSimpleName()); + this.cols = Utils.copyRequiredList(cols); + } + + public ExternalTable getTargetTable() { + return targetTable; + } + + @Override + public List getExpressions() { + return ImmutableList.of(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java index 58141e61bf8885..4a7febf3d1d9af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java @@ -25,30 +25,20 @@ import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.ExprId; -import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; -import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; -import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.Statistics; -import com.google.common.collect.ImmutableList; - import java.util.ArrayList; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -/** abstract physical hive sink */ -public class PhysicalHiveTableSink extends PhysicalTableSink implements Sink { - - private final HMSExternalDatabase database; - private final HMSExternalTable targetTable; - private final List cols; +/** physical hive sink */ +public class PhysicalHiveTableSink extends PhysicalBaseExternalTableSink { /** * constructor @@ -76,28 +66,14 @@ public PhysicalHiveTableSink(HMSExternalDatabase database, PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { - super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, + super(PlanType.PHYSICAL_HIVE_TABLE_SINK, database, targetTable, cols, outputExprs, groupExpression, logicalProperties, physicalProperties, statistics, child); - this.database = Objects.requireNonNull(database, "database != null in PhysicalHiveTableSink"); - this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalHiveTableSink"); - this.cols = Utils.copyRequiredList(cols); - } - - public HMSExternalDatabase getDatabase() { - return database; - } - - public HMSExternalTable getTargetTable() { - return targetTable; - } - - public List getCols() { - return cols; } @Override public Plan withChildren(List children) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, groupExpression, + return new PhysicalHiveTableSink<>( + (HMSExternalDatabase) database, (HMSExternalTable) targetTable, cols, outputExprs, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @@ -106,27 +82,25 @@ public R accept(PlanVisitor visitor, C context) { return visitor.visitPhysicalHiveTableSink(this, context); } - @Override - public List getExpressions() { - return ImmutableList.of(); - } - @Override public Plan withGroupExpression(Optional groupExpression) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + return new PhysicalHiveTableSink<>( + (HMSExternalDatabase) database, (HMSExternalTable) targetTable, cols, outputExprs, groupExpression, getLogicalProperties(), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + return new PhysicalHiveTableSink<>( + (HMSExternalDatabase) database, (HMSExternalTable) targetTable, cols, outputExprs, groupExpression, logicalProperties.get(), children.get(0)); } @Override public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + return new PhysicalHiveTableSink<>( + (HMSExternalDatabase) database, (HMSExternalTable) targetTable, cols, outputExprs, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @@ -135,7 +109,7 @@ public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalPr */ @Override public PhysicalProperties getRequirePhysicalProperties() { - Set hivePartitionKeys = targetTable.getPartitionColumnNames(); + Set hivePartitionKeys = ((HMSExternalTable) targetTable).getPartitionColumnNames(); if (!hivePartitionKeys.isEmpty()) { List columnIdx = new ArrayList<>(); List fullSchema = targetTable.getFullSchema(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java new file mode 100644 index 00000000000000..2dd467c5f61948 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalIcebergTableSink.java @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.iceberg.IcebergExternalDatabase; +import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.statistics.Statistics; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** physical iceberg sink */ +public class PhysicalIcebergTableSink extends PhysicalBaseExternalTableSink { + + /** + * constructor + */ + public PhysicalIcebergTableSink(IcebergExternalDatabase database, + IcebergExternalTable targetTable, + List cols, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + CHILD_TYPE child) { + this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties, + PhysicalProperties.GATHER, null, child); + } + + /** + * constructor + */ + public PhysicalIcebergTableSink(IcebergExternalDatabase database, + IcebergExternalTable targetTable, + List cols, + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, + Statistics statistics, + CHILD_TYPE child) { + super(PlanType.PHYSICAL_ICEBERG_TABLE_SINK, database, targetTable, cols, outputExprs, groupExpression, + logicalProperties, physicalProperties, statistics, child); + } + + @Override + public Plan withChildren(List children) { + return new PhysicalIcebergTableSink<>( + (IcebergExternalDatabase) database, (IcebergExternalTable) targetTable, + cols, outputExprs, groupExpression, + getLogicalProperties(), physicalProperties, statistics, children.get(0)); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitPhysicalIcebergTableSink(this, context); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new PhysicalIcebergTableSink<>( + (IcebergExternalDatabase) database, (IcebergExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new PhysicalIcebergTableSink<>( + (IcebergExternalDatabase) database, (IcebergExternalTable) targetTable, cols, outputExprs, + groupExpression, logicalProperties.get(), children.get(0)); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalIcebergTableSink<>( + (IcebergExternalDatabase) database, (IcebergExternalTable) targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + } + + /** + * get output physical properties + */ + @Override + public PhysicalProperties getRequirePhysicalProperties() { + Set partitionNames = targetTable.getPartitionNames(); + if (!partitionNames.isEmpty()) { + List columnIdx = new ArrayList<>(); + List fullSchema = targetTable.getFullSchema(); + for (int i = 0; i < fullSchema.size(); i++) { + Column column = fullSchema.get(i); + if (partitionNames.contains(column.getName())) { + columnIdx.add(i); + } + } + // mapping partition id + List exprIds = columnIdx.stream() + .map(idx -> child().getOutput().get(idx).getExprId()) + .collect(Collectors.toList()); + DistributionSpecTableSinkHashPartitioned shuffleInfo = new DistributionSpecTableSinkHashPartitioned(); + shuffleInfo.setOutputColExprIds(exprIds); + return new PhysicalProperties(shuffleInfo); + } + return PhysicalProperties.SINK_RANDOM_PARTITIONED; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java index 976592b461e7b6..e0b8a1dddc1706 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java @@ -18,12 +18,14 @@ package org.apache.doris.nereids.trees.plans.visitor; import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; +import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink; import org.apache.doris.nereids.analyzer.UnboundResultSink; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink; import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; @@ -31,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; @@ -61,6 +64,10 @@ default R visitUnboundHiveTableSink(UnboundHiveTableSink unbound return visitLogicalSink(unboundTableSink, context); } + default R visitUnboundIcebergTableSink(UnboundIcebergTableSink unboundTableSink, C context) { + return visitLogicalSink(unboundTableSink, context); + } + default R visitUnboundResultSink(UnboundResultSink unboundResultSink, C context) { return visitLogicalSink(unboundResultSink, context); } @@ -85,6 +92,10 @@ default R visitLogicalHiveTableSink(LogicalHiveTableSink hiveTab return visitLogicalTableSink(hiveTableSink, context); } + default R visitLogicalIcebergTableSink(LogicalIcebergTableSink icebergTableSink, C context) { + return visitLogicalTableSink(icebergTableSink, context); + } + default R visitLogicalResultSink(LogicalResultSink logicalResultSink, C context) { return visitLogicalSink(logicalResultSink, context); } @@ -114,6 +125,10 @@ default R visitPhysicalHiveTableSink(PhysicalHiveTableSink hiveT return visitPhysicalTableSink(hiveTableSink, context); } + default R visitPhysicalIcebergTableSink(PhysicalIcebergTableSink icebergTableSink, C context) { + return visitPhysicalTableSink(icebergTableSink, context); + } + default R visitPhysicalResultSink(PhysicalResultSink physicalResultSink, C context) { return visitPhysicalSink(physicalResultSink, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java new file mode 100644 index 00000000000000..92350e95013c52 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BaseExternalTableDataSink.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/DataSink.java +// and modified by Doris + +package org.apache.doris.planner; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; +import org.apache.doris.thrift.TDataSink; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; + +import java.util.Optional; +import java.util.Set; + +public abstract class BaseExternalTableDataSink extends DataSink { + + protected TDataSink tDataSink; + + @Override + protected TDataSink toThrift() { + return tDataSink; + } + + @Override + public PlanNodeId getExchNodeId() { + return null; + } + + @Override + public DataPartition getOutputPartition() { + return DataPartition.RANDOM; + } + + /** + * File format types supported by the current table + */ + protected abstract Set supportedFileFormatTypes(); + + protected TFileFormatType getTFileFormatType(String format) throws AnalysisException { + TFileFormatType fileFormatType = TFileFormatType.FORMAT_UNKNOWN; + String lowerCase = format.toLowerCase(); + if (lowerCase.contains("orc")) { + fileFormatType = TFileFormatType.FORMAT_ORC; + } else if (lowerCase.contains("parquet")) { + fileFormatType = TFileFormatType.FORMAT_PARQUET; + } else if (lowerCase.contains("text")) { + fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; + } + if (!supportedFileFormatTypes().contains(fileFormatType)) { + throw new AnalysisException("Unsupported input format type: " + format); + } + return fileFormatType; + } + + protected TFileCompressType getTFileCompressType(String compressType) { + if ("snappy".equalsIgnoreCase(compressType)) { + return TFileCompressType.SNAPPYBLOCK; + } else if ("lz4".equalsIgnoreCase(compressType)) { + return TFileCompressType.LZ4BLOCK; + } else if ("lzo".equalsIgnoreCase(compressType)) { + return TFileCompressType.LZO; + } else if ("zlib".equalsIgnoreCase(compressType)) { + return TFileCompressType.ZLIB; + } else if ("zstd".equalsIgnoreCase(compressType)) { + return TFileCompressType.ZSTD; + } else if ("uncompressed".equalsIgnoreCase(compressType)) { + return TFileCompressType.PLAIN; + } else { + // try to use plain type to decompress parquet or orc file + return TFileCompressType.PLAIN; + } + } + + /** + * check sink params and generate thrift data sink to BE + * @param insertCtx insert info context + * @throws AnalysisException if source file format cannot be read + */ + public abstract void bindDataSink(Optional insertCtx) throws AnalysisException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java index c769bbea782d0a..8d6daa2f8b72b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.odbc.sink.OdbcTableSink; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TExplainLevel; @@ -70,6 +71,8 @@ public static DataSink createDataSink(TableIf table) throws AnalysisException { return new OdbcTableSink((OdbcTable) table); } else if (table instanceof HMSExternalTable) { return new HiveTableSink((HMSExternalTable) table); + } else if (table instanceof IcebergExternalTable) { + return new IcebergTableSink((IcebergExternalTable) table); } else { throw new AnalysisException("Unknown table type " + table.getType()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index 6374dfddc6636a..efa9bd9b8f8346 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -31,7 +31,6 @@ import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExplainLevel; -import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.THiveBucket; @@ -50,16 +49,24 @@ import java.util.Set; import java.util.stream.Collectors; -public class HiveTableSink extends DataSink { +public class HiveTableSink extends BaseExternalTableDataSink { - private HMSExternalTable targetTable; - protected TDataSink tDataSink; + private final HMSExternalTable targetTable; + private static final HashSet supportedTypes = new HashSet() {{ + add(TFileFormatType.FORMAT_ORC); + add(TFileFormatType.FORMAT_PARQUET); + }}; public HiveTableSink(HMSExternalTable targetTable) { super(); this.targetTable = targetTable; } + @Override + protected Set supportedFileFormatTypes() { + return supportedTypes; + } + @Override public String getExplainString(String prefix, TExplainLevel explainLevel) { StringBuilder strBuilder = new StringBuilder(); @@ -72,26 +79,7 @@ public String getExplainString(String prefix, TExplainLevel explainLevel) { } @Override - protected TDataSink toThrift() { - return tDataSink; - } - - @Override - public PlanNodeId getExchNodeId() { - return null; - } - - @Override - public DataPartition getOutputPartition() { - return DataPartition.RANDOM; - } - - /** - * check sink params and generate thrift data sink to BE - * @param insertCtx insert info context - * @throws AnalysisException if source file format cannot be read - */ - public void bindDataSink(List insertCols, Optional insertCtx) + public void bindDataSink(Optional insertCtx) throws AnalysisException { THiveTableSink tSink = new THiveTableSink(); tSink.setDbName(targetTable.getDbName()); @@ -124,7 +112,7 @@ public void bindDataSink(List insertCols, Optional bucketInfo.setBucketCount(sd.getNumBuckets()); tSink.setBucketInfo(bucketInfo); - TFileFormatType formatType = getFileFormatType(sd); + TFileFormatType formatType = getTFileFormatType(sd.getInputFormat()); tSink.setFileFormat(formatType); setCompressType(tSink, formatType); @@ -180,23 +168,7 @@ private void setCompressType(THiveTableSink tSink, TFileFormatType formatType) { compressType = "uncompressed"; break; } - - if ("snappy".equalsIgnoreCase(compressType)) { - tSink.setCompressionType(TFileCompressType.SNAPPYBLOCK); - } else if ("lz4".equalsIgnoreCase(compressType)) { - tSink.setCompressionType(TFileCompressType.LZ4BLOCK); - } else if ("lzo".equalsIgnoreCase(compressType)) { - tSink.setCompressionType(TFileCompressType.LZO); - } else if ("zlib".equalsIgnoreCase(compressType)) { - tSink.setCompressionType(TFileCompressType.ZLIB); - } else if ("zstd".equalsIgnoreCase(compressType)) { - tSink.setCompressionType(TFileCompressType.ZSTD); - } else if ("uncompressed".equalsIgnoreCase(compressType)) { - tSink.setCompressionType(TFileCompressType.PLAIN); - } else { - // try to use plain type to decompress parquet or orc file - tSink.setCompressionType(TFileCompressType.PLAIN); - } + tSink.setCompressionType(getTFileCompressType(compressType)); } private void setPartitionValues(THiveTableSink tSink) throws AnalysisException { @@ -207,7 +179,7 @@ private void setPartitionValues(THiveTableSink tSink) throws AnalysisException { for (org.apache.hadoop.hive.metastore.api.Partition partition : hivePartitions) { THivePartition hivePartition = new THivePartition(); StorageDescriptor sd = partition.getSd(); - hivePartition.setFileFormat(getFileFormatType(sd)); + hivePartition.setFileFormat(getTFileFormatType(sd.getInputFormat())); hivePartition.setValues(partition.getValues()); THiveLocationParams locationParams = new THiveLocationParams(); @@ -222,20 +194,6 @@ private void setPartitionValues(THiveTableSink tSink) throws AnalysisException { tSink.setPartitions(partitions); } - private TFileFormatType getFileFormatType(StorageDescriptor sd) throws AnalysisException { - TFileFormatType fileFormatType; - if (sd.getInputFormat().toLowerCase().contains("orc")) { - fileFormatType = TFileFormatType.FORMAT_ORC; - } else if (sd.getInputFormat().toLowerCase().contains("parquet")) { - fileFormatType = TFileFormatType.FORMAT_PARQUET; - } else if (sd.getInputFormat().toLowerCase().contains("text")) { - fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - } else { - throw new AnalysisException("Unsupported input format type: " + sd.getInputFormat()); - } - return fileFormatType; - } - protected TDataSinkType getDataSinkType() { return TDataSinkType.HIVE_TABLE_SINK; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java new file mode 100644 index 00000000000000..659be7cb1fed98 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; +import org.apache.doris.thrift.TDataSink; +import org.apache.doris.thrift.TDataSinkType; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TIcebergTableSink; +import org.apache.doris.thrift.TSortField; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.types.Types; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class IcebergTableSink extends BaseExternalTableDataSink { + + private final IcebergExternalTable targetTable; + private static final HashSet supportedTypes = new HashSet() {{ + add(TFileFormatType.FORMAT_ORC); + add(TFileFormatType.FORMAT_PARQUET); + }}; + + public IcebergTableSink(IcebergExternalTable targetTable) { + super(); + this.targetTable = targetTable; + } + + @Override + protected Set supportedFileFormatTypes() { + return supportedTypes; + } + + @Override + public String getExplainString(String prefix, TExplainLevel explainLevel) { + StringBuilder strBuilder = new StringBuilder(); + strBuilder.append(prefix).append("ICEBERG TABLE SINK\n"); + if (explainLevel == TExplainLevel.BRIEF) { + return strBuilder.toString(); + } + // TODO: explain partitions + return strBuilder.toString(); + } + + @Override + public void bindDataSink(Optional insertCtx) + throws AnalysisException { + + TIcebergTableSink tSink = new TIcebergTableSink(); + + Table icebergTable = targetTable.getIcebergTable(); + + tSink.setDbName(targetTable.getDbName()); + tSink.setTbName(targetTable.getName()); + + // schema + tSink.setSchemaJson(SchemaParser.toJson(icebergTable.schema())); + + // partition spec + if (icebergTable.spec().isPartitioned()) { + tSink.setPartitionSpecsJson(Maps.transformValues(icebergTable.specs(), PartitionSpecParser::toJson)); + tSink.setPartitionSpecId(icebergTable.spec().specId()); + } + + // sort order + if (icebergTable.sortOrder().isSorted()) { + SortOrder sortOrder = icebergTable.sortOrder(); + Set baseColumnFieldIds = icebergTable.schema().columns().stream() + .map(Types.NestedField::fieldId) + .collect(ImmutableSet.toImmutableSet()); + ImmutableList.Builder sortFields = ImmutableList.builder(); + for (SortField sortField : sortOrder.fields()) { + if (!sortField.transform().isIdentity()) { + continue; + } + if (!baseColumnFieldIds.contains(sortField.sourceId())) { + continue; + } + TSortField tSortField = new TSortField(); + tSortField.setSourceColumnId(sortField.sourceId()); + tSortField.setAscending(sortField.direction().equals(SortDirection.ASC)); + tSortField.setNullFirst(sortField.nullOrder().equals(NullOrder.NULLS_FIRST)); + sortFields.add(tSortField); + } + tSink.setSortFields(sortFields.build()); + } + + // file info + tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable))); + tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable))); + + // hadoop config + HashMap props = new HashMap<>(icebergTable.properties()); + Map catalogProps = targetTable.getCatalog().getProperties(); + props.putAll(catalogProps); + tSink.setHadoopConfig(props); + + // location + LocationPath locationPath = new LocationPath(IcebergUtils.dataLocation(icebergTable), catalogProps); + tSink.setOutputPath(locationPath.toStorageLocation().toString()); + tSink.setOriginalOutputPath(locationPath.toString()); + tSink.setFileType(locationPath.getTFileTypeForBE()); + + if (insertCtx.isPresent()) { + BaseExternalTableInsertCommandContext context = (BaseExternalTableInsertCommandContext) insertCtx.get(); + tSink.setOverwrite(context.isOverwrite()); + } + tDataSink = new TDataSink(TDataSinkType.ICEBERG_TABLE_SINK); + tDataSink.setIcebergTableSink(tSink); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 540c3598d0c2a2..705fbbec16590a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -93,6 +93,7 @@ import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.THivePartitionUpdate; +import org.apache.doris.thrift.TIcebergCommitData; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPipelineFragmentParams; @@ -246,6 +247,9 @@ public class Coordinator implements CoordInterface { // Collect all hivePartitionUpdates obtained from be Consumer> hivePartitionUpdateFunc; + // Collect all icebergCommitData obtained from be + Consumer> icebergCommitDataFunc; + // Input parameter private long jobId = -1; // job which this task belongs to private TUniqueId queryId; @@ -2476,6 +2480,10 @@ public void setHivePartitionUpdateFunc(Consumer> hive this.hivePartitionUpdateFunc = hivePartitionUpdateFunc; } + public void setIcebergCommitDataFunc(Consumer> icebergCommitDataFunc) { + this.icebergCommitDataFunc = icebergCommitDataFunc; + } + // update job progress from BE public void updateFragmentExecStatus(TReportExecStatusParams params) { if (enablePipelineXEngine) { @@ -2526,6 +2534,9 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); } + if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) { + icebergCommitDataFunc.accept(params.getIcebergCommitDatas()); + } Preconditions.checkArgument(params.isSetDetailedReport()); if (ctx.done) { @@ -2591,6 +2602,9 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); } + if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) { + icebergCommitDataFunc.accept(params.getIcebergCommitDatas()); + } if (LOG.isDebugEnabled()) { LOG.debug("Query {} instance {} is marked done", DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); @@ -2663,6 +2677,9 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) { if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) { hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates()); } + if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) { + icebergCommitDataFunc.accept(params.getIcebergCommitDatas()); + } instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java new file mode 100644 index 00000000000000..3d6486f939169e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.iceberg.IcebergMetadataOps; +import org.apache.doris.datasource.iceberg.IcebergTransaction; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class IcebergTransactionManager implements TransactionManager { + + private final Map transactions = new ConcurrentHashMap<>(); + private final IcebergMetadataOps ops; + + public IcebergTransactionManager(IcebergMetadataOps ops) { + this.ops = ops; + } + + @Override + public long begin() { + long id = Env.getCurrentEnv().getNextId(); + IcebergTransaction icebergTransaction = new IcebergTransaction(ops); + transactions.put(id, icebergTransaction); + return id; + } + + @Override + public void commit(long id) throws UserException { + getTransactionWithException(id).commit(); + transactions.remove(id); + } + + @Override + public void rollback(long id) { + getTransactionWithException(id).rollback(); + transactions.remove(id); + } + + @Override + public Transaction getTransaction(long id) { + return getTransactionWithException(id); + } + + public Transaction getTransactionWithException(long id) { + Transaction icebergTransaction = transactions.get(id); + if (icebergTransaction == null) { + throw new RuntimeException("Can't find transaction for " + id); + } + return icebergTransaction; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java index 394494a129d3b5..b8898d9b279e32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java @@ -18,6 +18,7 @@ package org.apache.doris.transaction; import org.apache.doris.datasource.hive.HiveMetadataOps; +import org.apache.doris.datasource.iceberg.IcebergMetadataOps; import org.apache.doris.fs.FileSystemProvider; import java.util.concurrent.Executor; @@ -28,4 +29,8 @@ public static TransactionManager createHiveTransactionManager(HiveMetadataOps op FileSystemProvider fileSystemProvider, Executor fileSystemExecutor) { return new HiveTransactionManager(ops, fileSystemProvider, fileSystemExecutor); } + + public static TransactionManager createIcebergTransactionManager(IcebergMetadataOps ops) { + return new IcebergTransactionManager(ops); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveDDLAndDMLPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveDDLAndDMLPlanTest.java index dc35d38af68241..48ef7c1f67a914 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveDDLAndDMLPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveDDLAndDMLPlanTest.java @@ -82,7 +82,7 @@ public class HiveDDLAndDMLPlanTest extends TestWithFeService { private List checkedHiveCols; private final Set createdDbs = new HashSet<>(); - private final Set createdTables = new HashSet<>(); + private final Set createdTables = new HashSet<>(); @Override protected void runBeforeAll() throws Exception { @@ -142,7 +142,12 @@ public Database getDatabase(String dbName) { @Mock public boolean tableExists(String dbName, String tblName) { - return createdTables.contains(tblName); + for (Table table : createdTables) { + if (table.getDbName().equals(dbName) && table.getTableName().equals(tblName)) { + return true; + } + } + return false; } @Mock @@ -154,7 +159,7 @@ public List getAllDatabases() { public void createTable(TableMetadata tbl, boolean ignoreIfExists) { if (tbl instanceof HiveTableMetadata) { Table table = HiveUtil.toHiveTable((HiveTableMetadata) tbl); - createdTables.add(table.getTableName()); + createdTables.add(table); if (checkedHiveCols == null) { // if checkedHiveCols is null, skip column check return; @@ -169,6 +174,16 @@ public void createTable(TableMetadata tbl, boolean ignoreIfExists) { } } } + + @Mock + public Table getTable(String dbName, String tblName) { + for (Table createdTable : createdTables) { + if (createdTable.getDbName().equals(dbName) && createdTable.getTableName().equals(tblName)) { + return createdTable; + } + } + return null; + } }; CreateDbStmt createDbStmt = new CreateDbStmt(true, new DbName("hive", mockedDbName), dbProps); Env.getCurrentEnv().createDb(createDbStmt); @@ -213,8 +228,10 @@ public ExternalDatabase getDbNullable(String dbName) { // mock after ThriftHMSCachedClient is mocked @Mock HMSExternalTable getTableNullable(String tableName) { - if (createdTables.contains(tableName)) { - return new HMSExternalTable(0, tableName, mockedDbName, hmsExternalCatalog); + for (Table table : createdTables) { + if (table.getTableName().equals(tableName)) { + return new HMSExternalTable(0, tableName, mockedDbName, hmsExternalCatalog); + } } return null; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index e441262f12e2dd..8644ee0ebdc8ec 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -178,7 +178,6 @@ public void testAppendPartitionForUnPartitionedTable() throws IOException { assertNumRows(3, table); genQueryID(); - System.out.println(DebugUtil.printId(connectContext.queryId())); List pus2 = new ArrayList<>(); pus2.add(createRandomAppend(null)); pus2.add(createRandomAppend(null)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java new file mode 100644 index 00000000000000..10de5427902c16 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.common.UserException; +import org.apache.doris.thrift.TFileContent; +import org.apache.doris.thrift.TIcebergCommitData; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.transforms.Transform; +import org.apache.iceberg.transforms.Transforms; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class IcebergTransactionTest { + + public static String dbName = "db3"; + public static String tbWithPartition = "tbWithPartition"; + public static String tbWithoutPartition = "tbWithoutPartition"; + public static IcebergMetadataOps ops; + public static Schema schema; + + @BeforeClass + public static void beforeClass() throws IOException { + createCatalog(); + createTable(); + } + + public static void createCatalog() throws IOException { + Path warehousePath = Files.createTempDirectory("test_warehouse_"); + String warehouse = "file://" + warehousePath.toAbsolutePath() + "/"; + HadoopCatalog hadoopCatalog = new HadoopCatalog(); + Map props = new HashMap<>(); + props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); + hadoopCatalog.setConf(new Configuration()); + hadoopCatalog.initialize("df", props); + ops = new IcebergMetadataOps(null, hadoopCatalog); + } + + public static void createTable() throws IOException { + HadoopCatalog icebergCatalog = (HadoopCatalog) ops.getCatalog(); + icebergCatalog.createNamespace(Namespace.of(dbName)); + schema = new Schema( + Types.NestedField.required(11, "ts1", Types.TimestampType.withoutZone()), + Types.NestedField.required(12, "ts2", Types.TimestampType.withoutZone()), + Types.NestedField.required(13, "ts3", Types.TimestampType.withoutZone()), + Types.NestedField.required(14, "ts4", Types.TimestampType.withoutZone()), + Types.NestedField.required(15, "dt1", Types.DateType.get()), + Types.NestedField.required(16, "dt2", Types.DateType.get()), + Types.NestedField.required(17, "dt3", Types.DateType.get()), + Types.NestedField.required(18, "dt4", Types.DateType.get()), + Types.NestedField.required(19, "str1", Types.StringType.get()), + Types.NestedField.required(20, "str2", Types.StringType.get()), + Types.NestedField.required(21, "int1", Types.IntegerType.get()), + Types.NestedField.required(22, "int2", Types.IntegerType.get()) + ); + + PartitionSpec partitionSpec = PartitionSpec.builderFor(schema) + .year("ts1") + .month("ts2") + .day("ts3") + .hour("ts4") + .year("dt1") + .month("dt2") + .day("dt3") + .identity("dt4") + .identity("str1") + .truncate("str2", 10) + .bucket("int1", 2) + .build(); + icebergCatalog.createTable(TableIdentifier.of(dbName, tbWithPartition), schema, partitionSpec); + icebergCatalog.createTable(TableIdentifier.of(dbName, tbWithoutPartition), schema); + } + + public List createPartitionValues() { + + Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z"); + long ts = DateTimeUtil.microsFromInstant(instant); + int dt = DateTimeUtil.daysFromInstant(instant); + + List partitionValues = new ArrayList<>(); + + // reference: org.apache.iceberg.transforms.Timestamps + partitionValues.add(Integer.valueOf(DateTimeUtil.microsToYears(ts)).toString()); + partitionValues.add(Integer.valueOf(DateTimeUtil.microsToMonths(ts)).toString()); + partitionValues.add("2024-12-11"); + partitionValues.add(Integer.valueOf(DateTimeUtil.microsToHours(ts)).toString()); + + // reference: org.apache.iceberg.transforms.Dates + partitionValues.add(Integer.valueOf(DateTimeUtil.daysToYears(dt)).toString()); + partitionValues.add(Integer.valueOf(DateTimeUtil.daysToMonths(dt)).toString()); + partitionValues.add("2024-12-11"); + + // identity dt4 + partitionValues.add("2024-12-11"); + // identity str1 + partitionValues.add("2024-12-11"); + // truncate str2 + partitionValues.add("2024-12-11"); + // bucket int1 + partitionValues.add("1"); + + return partitionValues; + } + + @Test + public void testPartitionedTable() throws UserException { + List partitionValues = createPartitionValues(); + + List ctdList = new ArrayList<>(); + TIcebergCommitData ctd1 = new TIcebergCommitData(); + ctd1.setFilePath("f1.parquet"); + ctd1.setPartitionValues(partitionValues); + ctd1.setFileContent(TFileContent.DATA); + ctd1.setRowCount(2); + ctd1.setFileSize(2); + + TIcebergCommitData ctd2 = new TIcebergCommitData(); + ctd2.setFilePath("f2.parquet"); + ctd2.setPartitionValues(partitionValues); + ctd2.setFileContent(TFileContent.DATA); + ctd2.setRowCount(4); + ctd2.setFileSize(4); + + ctdList.add(ctd1); + ctdList.add(ctd2); + + IcebergTransaction txn = getTxn(); + txn.updateIcebergCommitData(ctdList); + txn.beginInsert(dbName, tbWithPartition); + txn.finishInsert(); + txn.commit(); + Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithPartition)); + checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6"); + + checkPushDownByPartitionForTs(table, "ts1"); + checkPushDownByPartitionForTs(table, "ts2"); + checkPushDownByPartitionForTs(table, "ts3"); + checkPushDownByPartitionForTs(table, "ts4"); + + checkPushDownByPartitionForDt(table, "dt1"); + checkPushDownByPartitionForDt(table, "dt2"); + checkPushDownByPartitionForDt(table, "dt3"); + checkPushDownByPartitionForDt(table, "dt4"); + + checkPushDownByPartitionForString(table, "str1"); + checkPushDownByPartitionForString(table, "str2"); + + checkPushDownByPartitionForBucketInt(table, "int1"); + } + + public void checkPushDownByPartitionForBucketInt(Table table, String column) { + // (BucketUtil.hash(15) & Integer.MAX_VALUE) % 2 = 0 + Integer i1 = 15; + + UnboundPredicate lessThan = Expressions.lessThan(column, i1); + checkPushDownByPartition(table, lessThan, 2); + // can only filter this case + UnboundPredicate equal = Expressions.equal(column, i1); + checkPushDownByPartition(table, equal, 0); + UnboundPredicate greaterThan = Expressions.greaterThan(column, i1); + checkPushDownByPartition(table, greaterThan, 2); + + // (BucketUtil.hash(25) & Integer.MAX_VALUE) % 2 = 1 + Integer i2 = 25; + + UnboundPredicate lessThan2 = Expressions.lessThan(column, i2); + checkPushDownByPartition(table, lessThan2, 2); + UnboundPredicate equal2 = Expressions.equal(column, i2); + checkPushDownByPartition(table, equal2, 2); + UnboundPredicate greaterThan2 = Expressions.greaterThan(column, i2); + checkPushDownByPartition(table, greaterThan2, 2); + } + + public void checkPushDownByPartitionForString(Table table, String column) { + // Since the string used to create the partition is in date format, the date check can be reused directly + checkPushDownByPartitionForDt(table, column); + } + + public void checkPushDownByPartitionForTs(Table table, String column) { + String lessTs = "2023-12-11T12:34:56.123456"; + String eqTs = "2024-12-11T12:34:56.123456"; + String greaterTs = "2025-12-11T12:34:56.123456"; + + UnboundPredicate lessThan = Expressions.lessThan(column, lessTs); + checkPushDownByPartition(table, lessThan, 0); + UnboundPredicate equal = Expressions.equal(column, eqTs); + checkPushDownByPartition(table, equal, 2); + UnboundPredicate greaterThan = Expressions.greaterThan(column, greaterTs); + checkPushDownByPartition(table, greaterThan, 0); + } + + public void checkPushDownByPartitionForDt(Table table, String column) { + String less = "2023-12-11"; + String eq = "2024-12-11"; + String greater = "2025-12-11"; + + UnboundPredicate lessThan = Expressions.lessThan(column, less); + checkPushDownByPartition(table, lessThan, 0); + UnboundPredicate equal = Expressions.equal(column, eq); + checkPushDownByPartition(table, equal, 2); + UnboundPredicate greaterThan = Expressions.greaterThan(column, greater); + checkPushDownByPartition(table, greaterThan, 0); + } + + public void checkPushDownByPartition(Table table, Expression expr, Integer expectFiles) { + CloseableIterable fileScanTasks = table.newScan().filter(expr).planFiles(); + AtomicReference cnt = new AtomicReference<>(0); + fileScanTasks.forEach(notUse -> cnt.updateAndGet(v -> v + 1)); + Assert.assertEquals(expectFiles, cnt.get()); + } + + @Test + public void testUnPartitionedTable() throws UserException { + ArrayList ctdList = new ArrayList<>(); + TIcebergCommitData ctd1 = new TIcebergCommitData(); + ctd1.setFilePath("f1.parquet"); + ctd1.setFileContent(TFileContent.DATA); + ctd1.setRowCount(2); + ctd1.setFileSize(2); + + TIcebergCommitData ctd2 = new TIcebergCommitData(); + ctd2.setFilePath("f1.parquet"); + ctd2.setFileContent(TFileContent.DATA); + ctd2.setRowCount(4); + ctd2.setFileSize(4); + + ctdList.add(ctd1); + ctdList.add(ctd2); + + IcebergTransaction txn = getTxn(); + txn.updateIcebergCommitData(ctdList); + txn.beginInsert(dbName, tbWithoutPartition); + txn.finishInsert(); + txn.commit(); + + Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition)); + checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6"); + } + + public void checkSnapshotProperties(Map props, + String addRecords, + String addFileCnt, + String addFileSize) { + Assert.assertEquals(addRecords, props.get("added-records")); + Assert.assertEquals(addFileCnt, props.get("added-data-files")); + Assert.assertEquals(addFileSize, props.get("added-files-size")); + } + + public String numToYear(Integer num) { + Transform year = Transforms.year(); + return year.toHumanString(Types.IntegerType.get(), num); + } + + public String numToMonth(Integer num) { + Transform month = Transforms.month(); + return month.toHumanString(Types.IntegerType.get(), num); + } + + public String numToDay(Integer num) { + Transform day = Transforms.day(); + return day.toHumanString(Types.IntegerType.get(), num); + } + + public String numToHour(Integer num) { + Transform hour = Transforms.hour(); + return hour.toHumanString(Types.IntegerType.get(), num); + } + + @Test + public void testTransform() { + Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z"); + long ts = DateTimeUtil.microsFromInstant(instant); + Assert.assertEquals("2024", numToYear(DateTimeUtil.microsToYears(ts))); + Assert.assertEquals("2024-12", numToMonth(DateTimeUtil.microsToMonths(ts))); + Assert.assertEquals("2024-12-11", numToDay(DateTimeUtil.microsToDays(ts))); + Assert.assertEquals("2024-12-11-12", numToHour(DateTimeUtil.microsToHours(ts))); + + int dt = DateTimeUtil.daysFromInstant(instant); + Assert.assertEquals("2024", numToYear(DateTimeUtil.daysToYears(dt))); + Assert.assertEquals("2024-12", numToMonth(DateTimeUtil.daysToMonths(dt))); + Assert.assertEquals("2024-12-11", numToDay(dt)); + } + + public IcebergTransaction getTxn() { + return new IcebergTransaction(ops); + } +} diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 834c9025cfdc1d..e613ef788b15cc 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -39,6 +39,7 @@ enum TDataSinkType { GROUP_COMMIT_OLAP_TABLE_SINK, // deprecated GROUP_COMMIT_BLOCK_SINK, HIVE_TABLE_SINK, + ICEBERG_TABLE_SINK, } enum TResultSinkType { @@ -355,6 +356,43 @@ struct THivePartitionUpdate { 7: optional list s3_mpu_pending_uploads } +enum TFileContent { + DATA = 0, + POSITION_DELETES = 1, + EQUALITY_DELETES = 2 +} + +struct TIcebergCommitData { + 1: optional string file_path + 2: optional i64 row_count + 3: optional i64 file_size + 4: optional TFileContent file_content + 5: optional list partition_values + 6: optional list referenced_data_files +} + +struct TSortField { + 1: optional i32 source_column_id + 2: optional bool ascending + 3: optional bool null_first +} + +struct TIcebergTableSink { + 1: optional string db_name + 2: optional string tb_name + 3: optional string schema_json + 4: optional map partition_specs_json + 5: optional i32 partition_spec_id + 6: optional list sort_fields + 7: optional PlanNodes.TFileFormatType file_format + 8: optional string output_path + 9: optional map hadoop_config + 10: optional bool overwrite + 11: optional Types.TFileType file_type + 12: optional string original_output_path + 13: optional PlanNodes.TFileCompressType compression_type +} + struct TDataSink { 1: required TDataSinkType type 2: optional TDataStreamSink stream_sink @@ -368,4 +406,5 @@ struct TDataSink { 11: optional TJdbcTableSink jdbc_table_sink 12: optional TMultiCastDataStreamSink multi_cast_stream_sink 13: optional THiveTableSink hive_table_sink + 14: optional TIcebergTableSink iceberg_table_sink } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index d2996f2a5dcc07..7c1c2360e5e4c6 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -357,11 +357,11 @@ struct TListTableStatusResult { struct TTableMetadataNameIds { 1: optional string name - 2: optional i64 id + 2: optional i64 id } struct TListTableMetadataNameIdsResult { - 1: optional list tables + 1: optional list tables } // getTableNames returns a list of unqualified table names @@ -484,6 +484,8 @@ struct TReportExecStatusParams { 25: optional TReportWorkloadRuntimeStatusParams report_workload_runtime_status 26: optional list hive_partition_updates + + 27: optional list iceberg_commit_datas } struct TFeResult {