From e73e53f0a12b8896e72e1330ffcfc87991186afd Mon Sep 17 00:00:00 2001 From: qijianliang01 Date: Fri, 18 Feb 2022 10:55:31 +0800 Subject: [PATCH 1/2] support query iceberg external table --- be/src/runtime/descriptors.cpp | 28 +++++ be/src/runtime/descriptors.h | 18 +++ fe/fe-core/pom.xml | 8 ++ .../org/apache/doris/catalog/HiveTable.java | 2 +- .../apache/doris/catalog/IcebergTable.java | 108 ++++++++++++++-- .../apache/doris/load/BrokerFileGroup.java | 11 ++ .../org/apache/doris/load/LoadChecker.java | 2 +- .../apache/doris/planner/BrokerScanNode.java | 28 +++-- .../doris/planner/DistributedPlanner.java | 2 +- .../apache/doris/planner/IcebergScanNode.java | 117 ++++++++++++++++++ .../doris/planner/SingleNodePlanner.java | 4 + .../org/apache/doris/qe/AuditLogBuilder.java | 4 +- .../doris/catalog/CreateTableLikeTest.java | 3 +- .../apache/doris/plugin/PluginMgrTest.java | 2 +- gensrc/thrift/Types.thrift | 4 +- 15 files changed, 306 insertions(+), 35 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 2e7041b4936043..eb31853a315f40 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -148,6 +148,28 @@ std::string BrokerTableDescriptor::debug_string() const { return out.str(); } +HiveTableDescriptor::HiveTableDescriptor(const TTableDescriptor& tdesc) + : TableDescriptor(tdesc) {} + +HiveTableDescriptor::~HiveTableDescriptor() {} + +std::string HiveTableDescriptor::debug_string() const { + std::stringstream out; + out << "HiveTable(" << TableDescriptor::debug_string() << ")"; + return out.str(); +} + +IcebergTableDescriptor::IcebergTableDescriptor(const TTableDescriptor& tdesc) + : TableDescriptor(tdesc) {} + +IcebergTableDescriptor::~IcebergTableDescriptor() {} + +std::string IcebergTableDescriptor::debug_string() const { + std::stringstream out; + out << "IcebergTable(" << TableDescriptor::debug_string() << ")"; + return out.str(); +} + EsTableDescriptor::EsTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) {} EsTableDescriptor::~EsTableDescriptor() {} @@ -524,6 +546,12 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb case TTableType::ES_TABLE: desc = pool->add(new EsTableDescriptor(tdesc)); break; + case TTableType::HIVE_TABLE: + desc = pool->add(new HiveTableDescriptor(tdesc)); + break; + case TTableType::ICEBERG_TABLE: + desc = pool->add(new IcebergTableDescriptor(tdesc)); + break; default: DCHECK(false) << "invalid table type: " << tdesc.tableType; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 708a1542256909..677271f7e7110d 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -190,6 +190,24 @@ class BrokerTableDescriptor : public TableDescriptor { private: }; +class HiveTableDescriptor : public TableDescriptor { +public: + HiveTableDescriptor(const TTableDescriptor& tdesc); + virtual ~HiveTableDescriptor(); + virtual std::string debug_string() const; + +private: +}; + +class IcebergTableDescriptor : public TableDescriptor { +public: + IcebergTableDescriptor(const TTableDescriptor& tdesc); + virtual ~IcebergTableDescriptor(); + virtual std::string debug_string() const; + +private: +}; + class EsTableDescriptor : public TableDescriptor { public: EsTableDescriptor(const TTableDescriptor& tdesc); diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 9b1110859bdd7e..25df902290ed28 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -599,6 +599,14 @@ under the License. iceberg-hive-metastore provided + + + + org.apache.avro + avro + 1.10.1 + + palo-fe diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java index 236d2660a44354..ff4adc001fb57e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java @@ -134,7 +134,7 @@ public void readFields(DataInput in) throws IOException { @Override public TTableDescriptor toThrift() { THiveTable tHiveTable = new THiveTable(getHiveDb(), getHiveTable(), getHiveProperties()); - TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, fullSchema.size(), 0, getName(), ""); tTableDescriptor.setHiveTable(tHiveTable); return tTableDescriptor; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java index c16493690f9711..90383813e631c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java @@ -18,21 +18,36 @@ package org.apache.doris.catalog; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; +import org.apache.doris.external.iceberg.IcebergCatalog; +import org.apache.doris.external.iceberg.IcebergCatalogMgr; +import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TIcebergTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * External Iceberg table @@ -44,10 +59,17 @@ public class IcebergTable extends Table { private String icebergDb; // remote Iceberg table name private String icebergTbl; + // remote Iceberg table location + private String location; + // Iceberg table file format + private String fileFormat; private Map icebergProperties = Maps.newHashMap(); private org.apache.iceberg.Table icebergTable; + private final byte[] loadLock = new byte[0]; + private final AtomicBoolean isLoaded = new AtomicBoolean(false); + public IcebergTable() { super(TableType.ICEBERG); } @@ -73,28 +95,88 @@ public String getIcebergDb() { return icebergDb; } - public void setIcebergDb(String icebergDb) { - this.icebergDb = icebergDb; - } - public String getIcebergTbl() { return icebergTbl; } - public void setIcebergTbl(String icebergTbl) { - this.icebergTbl = icebergTbl; - } - public Map getIcebergProperties() { return icebergProperties; } - public void setIcebergProperties(Map icebergProperties) { - this.icebergProperties = icebergProperties; + public String getLocation() throws UserException { + if (Strings.isNullOrEmpty(location)) { + try { + getTable(); + } catch (Exception e) { + throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage()); + } + location = icebergTable.location(); + } + return location; + } + + public String getFileFormat() throws UserException { + if (Strings.isNullOrEmpty(fileFormat)) { + try { + getTable(); + } catch (Exception e) { + throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage()); + } + fileFormat = icebergTable.properties().get(TableProperties.DEFAULT_FILE_FORMAT); + } + return fileFormat; } - public org.apache.iceberg.Table getIcebergTable() { - return icebergTable; + // get the iceberg table instance, if table is not loaded, load it. + private org.apache.iceberg.Table getTable() throws Exception { + if (isLoaded.get()) { + Preconditions.checkNotNull(icebergTable); + return icebergTable; + } + synchronized (loadLock) { + if (icebergTable != null) { + return icebergTable; + } + + IcebergProperty icebergProperty = getIcebergProperty(); + IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); + try { + this.icebergTable = icebergCatalog.loadTable(TableIdentifier.of(icebergDb, icebergTbl)); + LOG.info("finished to load table: {}", name); + } catch (Exception e) { + LOG.warn("failed to load table {} from {}", name, icebergProperty.getHiveMetastoreUris(), e); + throw e; + } + + isLoaded.set(true); + return icebergTable; + } + } + + private IcebergProperty getIcebergProperty() { + Map properties = Maps.newHashMap(icebergProperties); + properties.put(IcebergProperty.ICEBERG_DATABASE, icebergDb); + properties.put(IcebergProperty.ICEBERG_TABLE, icebergTbl); + return new IcebergProperty(properties); + } + + /** + * Get iceberg data file by file system table location and iceberg predicates + * @throws Exception + */ + public List getIcebergDataFiles(List predicates) throws Exception { + org.apache.iceberg.Table table = getTable(); + TableScan scan = table.newScan(); + for (Expression predicate : predicates) { + scan = scan.filter(predicate); + } + List relatedFiles = Lists.newArrayList(); + for (FileScanTask task : scan.planFiles()) { + Path path = Paths.get(task.file().path().toString()); + String relativePath = "/" + path.subpath(2, path.getNameCount()); + relatedFiles.add(new TBrokerFileStatus(relativePath, false, task.file().fileSizeInBytes(), false)); + } + return relatedFiles; } @Override @@ -128,7 +210,7 @@ public void readFields(DataInput in) throws IOException { @Override public TTableDescriptor toThrift() { TIcebergTable tIcebergTable = new TIcebergTable(getIcebergDb(), getIcebergTbl(), getIcebergProperties()); - TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE, fullSchema.size(), 0, getName(), ""); tTableDescriptor.setIcebergTable(tIcebergTable); return tTableDescriptor; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 95dbc125969630..b5312dfa8ace1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.HiveTable; +import org.apache.doris.catalog.IcebergTable; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; @@ -37,6 +38,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.loadv2.LoadTask; @@ -137,6 +139,15 @@ public BrokerFileGroup(HiveTable table, this.columnExprList = columnExprList; } + // Used for iceberg table, no need to parse + public BrokerFileGroup(IcebergTable table) throws UserException { + this.tableId = table.getId(); + this.isNegative = false; + this.valueSeparator = "|"; + this.lineDelimiter = "\n"; + this.fileFormat = table.getFileFormat(); + } + public BrokerFileGroup(DataDescription dataDescription) { this.fileFieldNames = dataDescription.getFileFieldNames(); this.columnsFromPath = dataDescription.getColumnsFromPath(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java index e808ec3c05c398..415afe77aa3f5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java @@ -63,7 +63,7 @@ import java.util.Map.Entry; import java.util.Set; -import avro.shaded.com.google.common.collect.Lists; +import com.google.common.collect.Lists; public class LoadChecker extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(LoadChecker.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 9f75814163932c..12f01b8e1fc51d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -401,7 +401,7 @@ private void assignBackends() throws UserException { Collections.shuffle(backends, random); } - private TFileFormatType formatType(String fileFormat, String path) { + private TFileFormatType formatType(String fileFormat, String path) throws UserException { if (fileFormat != null) { if (fileFormat.toLowerCase().equals("parquet")) { return TFileFormatType.FORMAT_PARQUET; @@ -411,6 +411,8 @@ private TFileFormatType formatType(String fileFormat, String path) { return TFileFormatType.FORMAT_JSON; } else if (fileFormat.toLowerCase().equals("csv")) { return TFileFormatType.FORMAT_CSV_PLAIN; + } else { + throw new UserException("Not supported file format: " + fileFormat); } } @@ -432,6 +434,10 @@ private TFileFormatType formatType(String fileFormat, String path) { } } + public String getHdfsUri() throws UserException { + return ""; + } + // If fileFormat is not null, we use fileFormat instead of check file's suffix private void processFileGroup( ParamCreateContext context, @@ -440,11 +446,11 @@ private void processFileGroup( if (fileStatuses == null || fileStatuses.isEmpty()) { return; } + // set hdfs params, used to Hive and Iceberg scan THdfsParams tHdfsParams = new THdfsParams(); - if (this instanceof HiveScanNode) { - String fsName = ((HiveScanNode) this).getHdfsUri(); - tHdfsParams.setFsName(fsName); - } + String fsName = getHdfsUri(); + tHdfsParams.setFsName(fsName); + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc); long curInstanceBytes = 0; long curFileOffset = 0; @@ -477,10 +483,8 @@ private void processFileGroup( } else { TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, leftBytes, columnsFromPath, numberOfColumnsFromFile, brokerDesc); - if (this instanceof HiveScanNode) { - rangeDesc.setHdfsParams(tHdfsParams); - rangeDesc.setReadByColumnDef(true); - } + rangeDesc.setHdfsParams(tHdfsParams); + rangeDesc.setReadByColumnDef(true); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; i++; @@ -502,10 +506,8 @@ private void processFileGroup( rangeDesc.setNumAsString(context.fileGroup.isNumAsString()); rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine()); } - if (this instanceof HiveScanNode) { - rangeDesc.setHdfsParams(tHdfsParams); - rangeDesc.setReadByColumnDef(true); - } + rangeDesc.setHdfsParams(tHdfsParams); + rangeDesc.setReadByColumnDef(true); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 8c209a67d51562..f9a665b63b6507 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -50,7 +50,7 @@ import java.util.Map; import java.util.stream.Collectors; -import avro.shaded.com.google.common.collect.Maps; +import com.google.common.collect.Maps; /** * The distributed planner is responsible for creating an executable, distributed plan diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java new file mode 100644 index 00000000000000..deec558d913f9d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.IcebergProperty; +import org.apache.doris.catalog.IcebergTable; +import org.apache.doris.common.UserException; +import org.apache.doris.external.iceberg.util.IcebergUtils; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TExplainLevel; + +import org.apache.iceberg.expressions.Expression; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +public class IcebergScanNode extends BrokerScanNode { + private static final Logger LOG = LogManager.getLogger(IcebergScanNode.class); + + private IcebergTable icebergTable; + private final List icebergPredicates = new ArrayList<>(); + private String hdfsUri; + + public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, + List> fileStatusesList, int filesAdded) { + super(id, desc, planNodeName, fileStatusesList, filesAdded); + icebergTable = (IcebergTable) desc.getTable(); + } + + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + } + + @Override + protected void initFileGroup() throws UserException { + fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable)); + brokerDesc = new BrokerDesc("IcebergTableDesc", StorageBackend.StorageType.HDFS, icebergTable.getIcebergProperties()); + targetTable = icebergTable; + } + + @Override + public String getHdfsUri() throws UserException { + if (Strings.isNullOrEmpty(hdfsUri)) { + String location = icebergTable.getLocation(); + String[] strings = StringUtils.split(location, "/"); + String[] strs = StringUtils.split(strings[1], ":"); + this.hdfsUri = "hdfs://" + strs[0] + ":" + strs[1]; + } + return hdfsUri; + } + + @Override + protected void getFileStatus() throws UserException { + // extract iceberg conjuncts + ListIterator it = conjuncts.listIterator(); + while (it.hasNext()) { + Expression expression = IcebergUtils.convertToIcebergExpr(it.next()); + if (expression != null) { + icebergPredicates.add(expression); + } + } + // get iceberg file status + List fileStatuses; + try { + fileStatuses = icebergTable.getIcebergDataFiles(icebergPredicates); + } catch (Exception e) { + LOG.warn("errors while load iceberg table {} data files.", icebergTable.getName(), e); + throw new UserException("errors while load Iceberg table [" + + icebergTable.getName() + "] data files."); + } + fileStatusesList.add(fileStatuses); + filesAdded += fileStatuses.size(); + for (TBrokerFileStatus fstatus : fileStatuses) { + LOG.info("Add file status is {}", fstatus); + } + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + if (!isLoad()) { + output.append(prefix).append("TABLE: ").append(icebergTable.getName()).append("\n"); + output.append(prefix).append("PATH: ") + .append(icebergTable.getIcebergProperties().get(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS)) + .append("\n"); + } + return output.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index b3aaf2af43a505..8414da0c485ae3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1698,6 +1698,10 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s scanNode = new HiveScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "HiveScanNode", null, -1); break; + case ICEBERG: + scanNode = new IcebergScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode", + null, -1); + break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java index c940e0d2f4d99c..ffb645923a7cd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java @@ -17,8 +17,8 @@ package org.apache.doris.qe; -import avro.shaded.com.google.common.collect.Maps; -import avro.shaded.com.google.common.collect.Sets; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.doris.common.AuditLog; import org.apache.doris.common.Config; import org.apache.doris.common.util.DigitalVersion; diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java index 13a32d5ccb40a9..90e8acc277d874 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java @@ -17,7 +17,6 @@ package org.apache.doris.catalog; -import avro.shaded.com.google.common.collect.Lists; import org.apache.commons.collections.ListUtils; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableLikeStmt; @@ -37,7 +36,7 @@ import java.util.List; import java.util.UUID; -import avro.shaded.com.google.common.collect.Lists; +import com.google.common.collect.Lists; /** * @author wangcong diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java index e5a1885350388d..1f1d012c25cd58 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java @@ -38,7 +38,7 @@ import java.nio.file.Files; import java.util.UUID; -import avro.shaded.com.google.common.collect.Maps; +import com.google.common.collect.Maps; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index c1c8487ab1bc78..88992a737536e8 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -357,7 +357,9 @@ enum TTableType { KUDU_TABLE, // Deprecated BROKER_TABLE, ES_TABLE, - ODBC_TABLE + ODBC_TABLE, + HIVE_TABLE, + ICEBERG_TABLE } enum TOdbcTableType { From 8d090019c7181ab1b7b620544a6bedbc8f8ef6be Mon Sep 17 00:00:00 2001 From: qijianliang01 Date: Thu, 24 Feb 2022 20:32:10 +0800 Subject: [PATCH 2/2] fix reviews Change-Id: I2b7a4ae72df4e24e75d7151ab9644e6adc04d58d --- fe/fe-core/pom.xml | 2 +- .../apache/doris/catalog/IcebergTable.java | 65 ++++++++++++++++--- .../org/apache/doris/load/LoadChecker.java | 2 +- .../apache/doris/planner/BrokerScanNode.java | 4 +- .../doris/planner/DistributedPlanner.java | 2 +- .../apache/doris/planner/HiveScanNode.java | 2 +- .../apache/doris/planner/IcebergScanNode.java | 22 ++----- .../org/apache/doris/qe/AuditLogBuilder.java | 5 +- .../doris/catalog/CreateTableLikeTest.java | 6 +- .../apache/doris/plugin/PluginMgrTest.java | 3 +- fe/pom.xml | 12 ++++ 11 files changed, 87 insertions(+), 38 deletions(-) diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 25df902290ed28..a71545cc33f5f5 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -604,7 +604,7 @@ under the License. org.apache.avro avro - 1.10.1 + provided diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java index 90383813e631c1..41a0458f4e01eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java @@ -17,7 +17,7 @@ package org.apache.doris.catalog; - +import org.apache.doris.analysis.StorageBackend; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.external.iceberg.IcebergCatalog; @@ -27,11 +27,7 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - +import org.apache.commons.lang3.StringUtils; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; @@ -40,6 +36,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -63,6 +64,12 @@ public class IcebergTable extends Table { private String location; // Iceberg table file format private String fileFormat; + // Iceberg storage type + private StorageBackend.StorageType storageType; + // Iceberg remote host uri + private String hostUri; + // location analyze flag + private boolean isAnalyzed = false; private Map icebergProperties = Maps.newHashMap(); private org.apache.iceberg.Table icebergTable; @@ -103,7 +110,7 @@ public Map getIcebergProperties() { return icebergProperties; } - public String getLocation() throws UserException { + private void getLocation() throws UserException { if (Strings.isNullOrEmpty(location)) { try { getTable(); @@ -112,7 +119,45 @@ public String getLocation() throws UserException { } location = icebergTable.location(); } - return location; + analyzeLocation(); + } + + private void analyzeLocation() throws UserException { + if (isAnalyzed) { + return; + } + String[] strings = StringUtils.split(location, "/"); + + // analyze storage type + String storagePrefix = strings[0].split(":")[0]; + if (storagePrefix.equalsIgnoreCase("s3")) { + this.storageType = StorageBackend.StorageType.S3; + } else if (storagePrefix.equalsIgnoreCase("hdfs")) { + this.storageType = StorageBackend.StorageType.HDFS; + } else { + throw new UserException("Not supported storage type: " + storagePrefix); + } + + // analyze host uri + // eg: hdfs://host:port + // s3://host:port + String host = strings[1]; + this.hostUri = storagePrefix + "://" + host; + this.isAnalyzed = true; + } + + public String getHostUri() throws UserException { + if (!isAnalyzed) { + getLocation(); + } + return hostUri; + } + + public StorageBackend.StorageType getStorageType() throws UserException { + if (!isAnalyzed) { + getLocation(); + } + return storageType; } public String getFileFormat() throws UserException { @@ -142,9 +187,9 @@ private org.apache.iceberg.Table getTable() throws Exception { IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); try { this.icebergTable = icebergCatalog.loadTable(TableIdentifier.of(icebergDb, icebergTbl)); - LOG.info("finished to load table: {}", name); + LOG.info("finished to load iceberg table: {}", name); } catch (Exception e) { - LOG.warn("failed to load table {} from {}", name, icebergProperty.getHiveMetastoreUris(), e); + LOG.warn("failed to load iceberg table {} from {}", name, icebergProperty.getHiveMetastoreUris(), e); throw e; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java index 415afe77aa3f5e..f11377e66fac7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java @@ -50,6 +50,7 @@ import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -63,7 +64,6 @@ import java.util.Map.Entry; import java.util.Set; -import com.google.common.collect.Lists; public class LoadChecker extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(LoadChecker.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 12f01b8e1fc51d..bfab5124a9ebf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -434,7 +434,7 @@ private TFileFormatType formatType(String fileFormat, String path) throws UserEx } } - public String getHdfsUri() throws UserException { + public String getHostUri() throws UserException { return ""; } @@ -448,7 +448,7 @@ private void processFileGroup( } // set hdfs params, used to Hive and Iceberg scan THdfsParams tHdfsParams = new THdfsParams(); - String fsName = getHdfsUri(); + String fsName = getHostUri(); tHdfsParams.setFsName(fsName); TScanRangeLocations curLocations = newLocations(context.params, brokerDesc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index f9a665b63b6507..eb099c26ec59b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -40,6 +40,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,7 +51,6 @@ import java.util.Map; import java.util.stream.Collectors; -import com.google.common.collect.Maps; /** * The distributed planner is responsible for creating an executable, distributed plan diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java index 2619079bfadc11..a67f5394ca187e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java @@ -70,7 +70,7 @@ public class HiveScanNode extends BrokerScanNode { private List partitionKeys = new ArrayList<>(); /* hive table properties */ - public String getHdfsUri() { + public String getHostUri() { return hdfsUri; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java index deec558d913f9d..5428c9ee555ec8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java @@ -17,13 +17,9 @@ package org.apache.doris.planner; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import org.apache.commons.lang3.StringUtils; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.IcebergProperty; import org.apache.doris.catalog.IcebergTable; @@ -37,6 +33,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.collect.Lists; + import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -46,7 +44,6 @@ public class IcebergScanNode extends BrokerScanNode { private IcebergTable icebergTable; private final List icebergPredicates = new ArrayList<>(); - private String hdfsUri; public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, List> fileStatusesList, int filesAdded) { @@ -62,19 +59,14 @@ public void init(Analyzer analyzer) throws UserException { @Override protected void initFileGroup() throws UserException { fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable)); - brokerDesc = new BrokerDesc("IcebergTableDesc", StorageBackend.StorageType.HDFS, icebergTable.getIcebergProperties()); + brokerDesc = new BrokerDesc("IcebergTableDesc", icebergTable.getStorageType(), + icebergTable.getIcebergProperties()); targetTable = icebergTable; } @Override - public String getHdfsUri() throws UserException { - if (Strings.isNullOrEmpty(hdfsUri)) { - String location = icebergTable.getLocation(); - String[] strings = StringUtils.split(location, "/"); - String[] strs = StringUtils.split(strings[1], ":"); - this.hdfsUri = "hdfs://" + strs[0] + ":" + strs[1]; - } - return hdfsUri; + public String getHostUri() throws UserException { + return icebergTable.getHostUri(); } @Override @@ -99,7 +91,7 @@ protected void getFileStatus() throws UserException { fileStatusesList.add(fileStatuses); filesAdded += fileStatuses.size(); for (TBrokerFileStatus fstatus : fileStatuses) { - LOG.info("Add file status is {}", fstatus); + LOG.debug("Add file status is {}", fstatus); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java index ffb645923a7cd8..818e1169116cf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java @@ -17,8 +17,6 @@ package org.apache.doris.qe; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.doris.common.AuditLog; import org.apache.doris.common.Config; import org.apache.doris.common.util.DigitalVersion; @@ -34,6 +32,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import java.lang.reflect.Field; import java.util.Map; import java.util.Set; diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java index 90e8acc277d874..bda5ea67bd9201 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java @@ -17,13 +17,11 @@ package org.apache.doris.catalog; -import org.apache.commons.collections.ListUtils; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableLikeStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; -import org.apache.doris.common.util.ListUtil; import org.apache.doris.qe.ConnectContext; import org.apache.doris.utframe.UtFrameUtils; @@ -32,12 +30,12 @@ import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.Lists; + import java.io.File; import java.util.List; import java.util.UUID; -import com.google.common.collect.Lists; - /** * @author wangcong * @version 1.0 diff --git a/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java index 1f1d012c25cd58..80edee5047c2e3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/plugin/PluginMgrTest.java @@ -30,6 +30,8 @@ import org.junit.BeforeClass; import org.junit.Test; +import com.google.common.collect.Maps; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -38,7 +40,6 @@ import java.nio.file.Files; import java.util.UUID; -import com.google.common.collect.Maps; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; diff --git a/fe/pom.xml b/fe/pom.xml index 80b7bef2445d6b..36dd85c214e127 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -120,7 +120,12 @@ under the License. 2.17.1 0.15-SNAPSHOT github + + 0.12.0 + 1.10.1 + @@ -706,6 +711,13 @@ under the License. ${iceberg.version} + + + org.apache.avro + avro + ${avro.version} + + org.apache.parquet parquet-column