From 86ae20aec07993de3127bfe48aa192df7a3c4847 Mon Sep 17 00:00:00 2001 From: dujunling Date: Sat, 21 May 2022 16:59:34 +0800 Subject: [PATCH 1/8] support query hudi external table --- fe/fe-core/pom.xml | 7 + .../org/apache/doris/analysis/Analyzer.java | 7 + .../org/apache/doris/catalog/Catalog.java | 9 + .../catalog/HiveMetaStoreClientHelper.java | 33 +- .../doris/external/hive/util/HiveUtil.java | 195 ++++++++++ .../apache/doris/external/hudi/HudiTable.java | 2 +- .../apache/doris/external/hudi/HudiUtils.java | 47 ++- .../apache/doris/load/BrokerFileGroup.java | 21 + .../apache/doris/planner/BrokerScanNode.java | 38 +- .../apache/doris/planner/HudiScanNode.java | 364 ++++++++++++++++++ .../doris/planner/SingleNodePlanner.java | 4 + fe/pom.xml | 6 + 12 files changed, 702 insertions(+), 31 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 009cf9df9043af..58e6a8e5497ca8 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -68,6 +68,13 @@ under the License. pom import + + + org.apache.hadoop + hadoop-mapreduce-client + 2.7.4 + compile + diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index a47e8a9b5e9b9d..7478a5290a3f2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -38,6 +38,8 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.VecNotImplException; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.external.hudi.HudiTable; +import org.apache.doris.external.hudi.HudiUtils; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.qe.ConnectContext; @@ -626,6 +628,11 @@ public TableRef resolveTableRef(TableRef tableRef) throws AnalysisException { } } + if (table.getType() == TableType.HUDI && ((HudiTable) table).getFullSchema().isEmpty()) { + // resolve hudi table's schema when table schema is empty from doris meta + table = HudiUtils.resolveHudiTable((HudiTable)table); + } + // tableName.getTbl() stores the table name specified by the user in the from statement. // In the case of case-sensitive table names, the value of tableName.getTbl() is the same as table.getName(). // However, since the system view is not case-sensitive, table.getName() gets the lowercase view name, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 69fe06925830d2..1952689d27bed1 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4142,6 +4142,15 @@ private void createHudiTable(Database db, CreateTableStmt stmt) throws DdlExcept if (!hudiTable.getFullSchema().isEmpty()) { HudiUtils.validateColumns(hudiTable, hiveTable); } + switch (hiveTable.getTableType()) { + case "VIRTUAL_VIEW": + throw new DdlException("VIRTUAL_VIEW table is not supported."); + case "EXTERNAL_TABLE": + case "MANAGED_TABLE": + break; + default: + throw new DdlException("unsupported hudi table type [" + hiveTable.getTableType() + "]."); + } // check hive table if exists in doris database if (!db.createTableWithLock(hudiTable, false, stmt.isSetIfNotExists()).first) { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 53e27dfc6c127b..f6bbf8c9d30b62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -158,7 +158,7 @@ public static void dropClient(HiveMetaStoreClient client) { } /** - * Get data files of partitions in hive table, filter by partition predicate + * Get data files of partitions in hive table, filter by partition predicate. * @param hiveTable * @param hivePartitionPredicate * @param fileStatuses @@ -168,21 +168,11 @@ public static void dropClient(HiveMetaStoreClient client) { */ public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate, List fileStatuses, Table remoteHiveTbl) throws DdlException { - HiveMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS)); - List> remoteIterators; if (remoteHiveTbl.getPartitionKeys().size() > 0) { + String metaStoreUris = hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS); // hive partitioned table, get file iterator from table partition sd info - List hivePartitions = new ArrayList<>(); - try { - client.listPartitionsByExpr(hiveTable.getHiveDb(), hiveTable.getHiveTable(), - SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null, (short) -1, hivePartitions); - } catch (TException e) { - LOG.warn("Hive metastore thrift exception: {}", e.getMessage()); - throw new DdlException("Connect hive metastore failed. Error: " + e.getMessage()); - } finally { - client.close(); - } + List hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl, hivePartitionPredicate); remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties()); } else { // hive non-partitioned table, get file iterator from table sd info @@ -219,6 +209,23 @@ public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDe return hdfsUrl; } + public static List getHivePartitions(String metaStoreUris, Table remoteHiveTbl, + ExprNodeGenericFuncDesc hivePartitionPredicate) throws DdlException { + List hivePartitions = new ArrayList<>(); + HiveMetaStoreClient client = getClient(metaStoreUris); + try { + client.listPartitionsByExpr(remoteHiveTbl.getDbName(), remoteHiveTbl.getTableName(), + SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), + null, (short) -1, hivePartitions); + } catch (TException e) { + LOG.warn("Hive metastore thrift exception: {}", e.getMessage()); + throw new DdlException("Connect hive metastore failed."); + } finally { + client.close(); + } + return hivePartitions; + } + private static List> getRemoteIterator(List partitions, Map properties) throws DdlException { List> iterators = new ArrayList<>(); Configuration configuration = new Configuration(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java new file mode 100644 index 00000000000000..a841ffc9187b02 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.hive.util; + +import com.google.common.collect.Lists; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.UserException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; +import org.joda.time.format.DateTimeParser; +import org.joda.time.format.DateTimePrinter; +import org.joda.time.format.ISODateTimeFormat; + +import java.lang.reflect.Field; +import java.util.List; + +public final class HiveUtil { + private static final Logger LOG = LogManager.getLogger(HiveUtil.class); + + private static final DateTimeFormatter HIVE_DATE_PARSER = ISODateTimeFormat.date().withZoneUTC(); + private static final DateTimeFormatter HIVE_TIMESTAMP_PARSER; + private static final Field COMPRESSION_CODECS_FIELD; + + static { + DateTimeParser[] timestampWithoutTimeZoneParser = { + DateTimeFormat.forPattern("yyyy-M-d").getParser(), + DateTimeFormat.forPattern("yyyy-M-d H:m").getParser(), + DateTimeFormat.forPattern("yyyy-M-d H:m:s").getParser(), + DateTimeFormat.forPattern("yyyy-M-d H:m:s.SSS").getParser(), + DateTimeFormat.forPattern("yyyy-M-d H:m:s.SSSSSSS").getParser(), + DateTimeFormat.forPattern("yyyy-M-d H:m:s.SSSSSSSSS").getParser(), + }; + DateTimePrinter timestampWithoutTimeZonePrinter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getPrinter(); + HIVE_TIMESTAMP_PARSER = new DateTimeFormatterBuilder().append(timestampWithoutTimeZonePrinter, timestampWithoutTimeZoneParser).toFormatter().withZoneUTC(); + + try { + COMPRESSION_CODECS_FIELD = TextInputFormat.class.getDeclaredField("compressionCodecs"); + COMPRESSION_CODECS_FIELD.setAccessible(true); + } + catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + private HiveUtil() + { + } + + public static InputFormat getInputFormat(Configuration configuration, String inputFormatName, boolean symlinkTarget) throws UserException { + try { + JobConf jobConf = new JobConf(configuration); + + Class> inputFormatClass = getInputFormatClass(jobConf, inputFormatName); + if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) { + // symlink targets are always TextInputFormat + inputFormatClass = TextInputFormat.class; + } + + return ReflectionUtils.newInstance(inputFormatClass, jobConf); + } + catch (ClassNotFoundException | RuntimeException e) { + throw new UserException("Unable to create input format " + inputFormatName, e); + } + } + + @SuppressWarnings({"unchecked", "RedundantCast"}) + private static Class> getInputFormatClass(JobConf conf, String inputFormatName) + throws ClassNotFoundException + { + // CDH uses different names for Parquet + if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName) || + "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) { + return MapredParquetInputFormat.class; + } + + Class clazz = conf.getClassByName(inputFormatName); + return (Class>) clazz.asSubclass(InputFormat.class); + } + + public static List transformHiveSchema(List hiveSchema) throws AnalysisException { + List newSchema = Lists.newArrayList(); + for (FieldSchema hiveColumn : hiveSchema) { + try { + newSchema.add(HiveUtil.transformHiveField(hiveColumn)); + } catch (UnsupportedOperationException e) { + if (Config.iceberg_table_creation_strict_mode) { + throw e; + } + LOG.warn("Unsupported data type in Doris, ignore column[{}], with error: {}", + hiveColumn.getName(), e.getMessage()); + continue; + } + } + return newSchema; + } + + public static Column transformHiveField(FieldSchema field) { + TypeInfo hiveTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(field.getType()); + Type type = convertHiveTypeToiveDoris(hiveTypeInfo); + return new Column(field.getName(), type, false, null, true, null, field.getComment()); + } + + private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) { + + switch (hiveTypeInfo.getCategory()) { + case PRIMITIVE: { + PrimitiveTypeInfo pTypeInfo = (PrimitiveTypeInfo) hiveTypeInfo; + switch (pTypeInfo.getPrimitiveCategory()) { + case VOID: + return Type.NULL; + case BOOLEAN: + return Type.BOOLEAN; + case BYTE: + return Type.TINYINT; + case SHORT: + return Type.SMALLINT; + case INT: + return Type.INT; + case LONG: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case STRING: + return Type.STRING; + case CHAR: + return Type.CHAR; + case VARCHAR: + return Type.VARCHAR; + case DATE: + return Type.DATE; + case TIMESTAMP: + return Type.DATETIME; + case DECIMAL: + return Type.DECIMALV2; + default: + throw new UnsupportedOperationException("Unsupported type: " + pTypeInfo.getPrimitiveCategory()); + } + } + case LIST: + TypeInfo elementTypeInfo = ((ListTypeInfo)hiveTypeInfo) + .getListElementTypeInfo(); + Type newType = null; + if (elementTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) { + newType = convertHiveTypeToiveDoris(elementTypeInfo); + } else { + throw new UnsupportedOperationException("Unsupported type: " + hiveTypeInfo.toString()); + } + return new ArrayType(newType); + case MAP: + case STRUCT: + case UNION: + default: + throw new UnsupportedOperationException("Unsupported type: " + hiveTypeInfo.toString()); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java index 0e8e350eb33058..cfc491e35e478e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java @@ -117,7 +117,7 @@ public TTableDescriptor toThrift() { thriftHudiTable.setTableName(getHmsTableName()); thriftHudiTable.setProperties(getTableProperties()); - TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.HUDI_TABLE, + TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, fullSchema.size(), 0, getName(), ""); thriftTableDescriptor.setHudiTable(thriftHudiTable); return thriftTableDescriptor; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java index b1a65ad41d4c21..024b3d0a1fe307 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java @@ -17,21 +17,32 @@ package org.apache.doris.external.hudi; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import com.google.common.base.Strings; import com.google.common.collect.Maps; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.external.hive.util.HiveUtil; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Hudi utils. */ public class HudiUtils { + private static final Logger LOG = LogManager.getLogger(HudiUtils.class); private static final String PROPERTY_MISSING_MSG = "Hudi table %s is null. Please add properties('%s'='xxx') when create table"; @@ -117,12 +128,44 @@ public static void validateColumns(HudiTable table, Set hudiColumnNames = table.getFullSchema().stream() .map(x -> x.getName()).collect(Collectors.toSet()); - Set hiveTableColumnNames = hiveTable.getSd().getCols() - .stream().map(x -> x.getName()).collect(Collectors.toSet()); + Set hiveTableColumnNames = + Stream.concat(hiveTable.getSd().getCols().stream(), hiveTable.getPartitionKeys().stream()) + .map(x -> x.getName()).collect(Collectors.toSet()); hudiColumnNames.removeAll(hiveTableColumnNames); if (hudiColumnNames.size() > 0) { throw new DdlException(String.format("Hudi table's column(s): {%s} didn't exist in hive table. ", String.join(", ", hudiColumnNames))); } } + + public static HudiTable resolveHudiTable(HudiTable table) throws AnalysisException { + String metastoreUris = table.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS); + org.apache.hadoop.hive.metastore.api.Table remoteHiveTable = null; + try { + remoteHiveTable = HiveMetaStoreClientHelper.getTable( + table.getHmsDatabaseName(), + table.getHmsTableName(), + metastoreUris); + } catch (DdlException e) { + LOG.error("Failed to get table from HiveMetaStore", e); + throw new AnalysisException(ErrorCode.ERR_UNKNOWN_ERROR.formatErrorMsg()); + } + if (remoteHiveTable == null) { + throw new AnalysisException(ErrorCode.ERR_UNKNOWN_TABLE.formatErrorMsg(table.getHmsTableName(), + "HiveMetaStore")); + } + if (!HudiUtils.isHudiTable(remoteHiveTable)) { + throw new AnalysisException(ErrorCode.ERR_UNKNOWN_TABLE.formatErrorMsg(table.getHmsTableName(), + "HiveMetaStore")); + } + + List newSchema = HiveUtil.transformHiveSchema(remoteHiveTable.getSd().getCols()); + table.setNewFullSchema(newSchema); + return table; + + } + + + + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index cc41ab22ff5111..3b5b9f08d6367e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -76,6 +76,7 @@ public class BrokerFileGroup implements Writable { private List fileSize; private List fileFieldNames; + // partition columnNames private List columnsFromPath; // columnExprList includes all fileFieldNames, columnsFromPath and column mappings // this param will be recreated by data desc when the log replay @@ -146,6 +147,26 @@ public BrokerFileGroup(IcebergTable table) throws UserException { this.fileFormat = table.getFileFormat(); } + /** + * Should used for hive/iceberg/hudi external table. + */ + public BrokerFileGroup(long tableId, + String columnSeparator, + String lineDelimiter, + String filePath, + String fileFormat, + List columnsFromPath, + List columnExprList) throws AnalysisException { + this.tableId = tableId; + this.valueSeparator = Separator.convertSeparator(columnSeparator); + this.lineDelimiter = Separator.convertSeparator(lineDelimiter); + this.isNegative = false; + this.filePaths = Lists.newArrayList(filePath); + this.fileFormat = fileFormat; + this.columnsFromPath = columnsFromPath; + this.columnExprList = columnExprList; + } + public BrokerFileGroup(DataDescription dataDescription) { this.fileFieldNames = dataDescription.getFileFieldNames(); this.columnsFromPath = dataDescription.getColumnsFromPath(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 59fada152ea9e1..43035967f94d8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -125,7 +125,7 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) { private Analyzer analyzer; - private static class ParamCreateContext { + protected static class ParamCreateContext { public BrokerFileGroup fileGroup; public TBrokerScanRangeParams params; public TupleDescriptor srcTupleDescriptor; @@ -173,6 +173,10 @@ public void init(Analyzer analyzer) throws UserException { } } + public List getParamCreateContexts() { + return paramCreateContexts; + } + protected void initFileGroup() throws UserException { BrokerTable brokerTable = (BrokerTable) desc.getTable(); try { @@ -267,13 +271,15 @@ private void initColumns(ParamCreateContext context) throws UserException { } } - Load.initColumns(targetTable, columnDescs, - context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, - context.srcTupleDescriptor, context.slotDescByName, context.params, - formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized()); + if (targetTable != null) { + Load.initColumns(targetTable, columnDescs, + context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, + context.srcTupleDescriptor, context.slotDescByName, context.params, + formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized()); + } } - private TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc) + protected TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc) throws UserException { Backend selectedBackend; @@ -322,10 +328,6 @@ private TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDe return locations; } - private TBrokerScanRange brokerScanRange(TScanRangeLocations locations) { - return locations.scan_range.broker_scan_range; - } - private void getFileStatusAndCalcInstance() throws UserException { if (fileStatusesList == null || filesAdded == -1) { // FIXME(cmy): fileStatusesList and filesAdded can be set out of db lock when doing pull load, @@ -336,7 +338,10 @@ private void getFileStatusAndCalcInstance() throws UserException { filesAdded = 0; this.getFileStatus(); } - Preconditions.checkState(fileStatusesList.size() == fileGroups.size()); + // In hudiScanNode, calculate scan range using its own way which do not need fileStatusesList + if (!(this instanceof HudiScanNode)) { + Preconditions.checkState(fileStatusesList.size() == fileGroups.size()); + } if (isLoad() && filesAdded == 0) { throw new UserException("No source file in this table(" + targetTable.getName() + ")."); @@ -503,7 +508,7 @@ private void processFileGroup( rangeDesc.setNumAsString(context.fileGroup.isNumAsString()); rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine()); } - brokerScanRange(curLocations).addToRanges(rangeDesc); + curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); curFileOffset += rangeBytes; } else { @@ -516,7 +521,7 @@ private void processFileGroup( } rangeDesc.setReadByColumnDef(true); - brokerScanRange(curLocations).addToRanges(rangeDesc); + curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); curFileOffset = 0; i++; } @@ -544,7 +549,7 @@ private void processFileGroup( } rangeDesc.setReadByColumnDef(true); - brokerScanRange(curLocations).addToRanges(rangeDesc); + curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; i++; @@ -552,7 +557,7 @@ private void processFileGroup( } // Put the last file - if (brokerScanRange(curLocations).isSetRanges()) { + if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) { locationsList.add(curLocations); } } @@ -568,7 +573,10 @@ private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileSt rangeDesc.setSplittable(fileStatus.isSplitable); rangeDesc.setStartOffset(curFileOffset); rangeDesc.setSize(rangeBytes); + // fileSize only be used when format is orc or parquet and TFileType is broker + // When TFileType is other type, it is not necessary rangeDesc.setFileSize(fileStatus.size); + // In Backend, will append columnsFromPath to the end of row after data scanned from file. rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile); rangeDesc.setColumnsFromPath(columnsFromPath); rangeDesc.setHeaderType(headerType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java new file mode 100644 index 00000000000000..d1bd3cf672fac9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java @@ -0,0 +1,364 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.external.hive.util.HiveUtil; +import org.apache.doris.external.hudi.HudiProperty; +import org.apache.doris.external.hudi.HudiTable; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TBrokerRangeDesc; +import org.apache.doris.thrift.TBrokerScanRangeParams; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.THdfsParams; +import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.mortbay.log.Log; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.stream.Collectors; + +public class HudiScanNode extends BrokerScanNode { + private static final Logger LOG = LogManager.getLogger(HudiScanNode.class); + + private HudiTable hudiTable; + // partition column predicates of hive table + private List hivePredicates = new ArrayList<>(); + private ExprNodeGenericFuncDesc hivePartitionPredicate; + private List parsedColumnExprList = new ArrayList<>(); + private String hdfsUri; + + private Table remoteHiveTable; + + /* hudi table properties */ + private String fileFormat; + private String inputFormatName; + private String basePath; + private List partitionKeys = new ArrayList<>(); + /* hudi table properties */ + + private List scanRangeLocations; + + public String getHdfsUri() { + return hdfsUri; + } + + public List getParsedColumnExprList() { + return parsedColumnExprList; + } + + public String getFileFormat() { + return fileFormat; + } + + public String getBasePath() { + return basePath; + } + + public List getPartitionKeys() { + return partitionKeys; + } + + public HudiScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName, + List> fileStatusesList, int filesAdded) { + super(id, destTupleDesc, planNodeName, fileStatusesList, filesAdded); + this.hudiTable = (HudiTable) destTupleDesc.getTable(); + } + + /** + * super init will invoke initFileGroup, In initFileGroup will do + * 1, get hudi table from hive metastore + * 2, resolve hudi table type, query mode, table base path, partition columns information. + * 3. generate fileGroup + * + * @param analyzer + * @throws UserException + */ + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + // init scan range params + initParams(analyzer); + } + + @Override + public int getNumInstances() { + return scanRangeLocations.size(); + } + + @Override + protected void initFileGroup() throws UserException { + resolvHiveTable(); + analyzeColumnFromPath(); + + HudiTable hudiTable = (HudiTable) desc.getTable(); + fileGroups = Lists.newArrayList( + new BrokerFileGroup(hudiTable.getId(), + "\t", + "\n", + getBasePath(), + getFileFormat(), + getPartitionKeys(), + getParsedColumnExprList())); + brokerDesc = new BrokerDesc("HudiTableDesc", StorageBackend.StorageType.HDFS, hudiTable.getTableProperties()); + + } + + /** + * Override this function just for skip parent's getFileStatus + */ + @Override + protected void getFileStatus() throws DdlException { + if (partitionKeys.size() > 0) { + extractHivePartitionPredicate(); + } + // set fileStatusesList as empty, we do not need fileStatusesList + fileStatusesList = Lists.newArrayList(); + filesAdded = 0; + } + + @Override + public void finalize(Analyzer analyzer) throws UserException { + try { + ParamCreateContext context = getParamCreateContexts().get(0); + finalizeParams(context.slotDescByName, context.exprMap, context.params, + context.srcTupleDescriptor, false, context.fileGroup.isNegative(), analyzer); + } catch (AnalysisException e) { + throw new UserException(e.getMessage()); + } + try { + buildScanRange(); + } catch (IOException e) { + LOG.error("Build scan range failed.", e); + throw new UserException("Build scan range failed.", e); + } + } + + @Override + public List getScanRangeLocations(long maxScanRangeLength) { + return scanRangeLocations; + } + + private void resolvHiveTable() throws DdlException { + this.remoteHiveTable = HiveMetaStoreClientHelper.getTable( + hudiTable.getHmsDatabaseName(), + hudiTable.getHmsTableName(), + hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS)); + + this.inputFormatName = remoteHiveTable.getSd().getInputFormat(); + this.fileFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(this.inputFormatName); + this.basePath = remoteHiveTable.getSd().getLocation(); + for (FieldSchema fieldSchema : remoteHiveTable.getPartitionKeys()) { + this.partitionKeys.add(fieldSchema.getName()); + } + Log.info("Hudi inputFileFormat is " + inputFormatName + ", basePath is " + this.basePath); + } + + private void initParams(Analyzer analyzer) { + ParamCreateContext context = getParamCreateContexts().get(0); + TBrokerScanRangeParams params = context.params; + + Map slotDescByName = Maps.newHashMap(); + + List columns = hudiTable.getBaseSchema(false); + // init slot desc add expr map, also transform hadoop functions + for (Column column : columns) { + SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); + slotDesc.setType(column.getType()); + slotDesc.setIsMaterialized(true); + slotDesc.setIsNullable(true); + slotDesc.setColumn(column); + params.addToSrcSlotIds(slotDesc.getId().asInt()); + slotDescByName.put(column.getName(), slotDesc); + } + context.slotDescByName = slotDescByName; + } + + + /** + * Extracts partition predicate from SelectStmt.whereClause that can be pushed down to Hive + */ + private void extractHivePartitionPredicate() throws DdlException { + ListIterator it = conjuncts.listIterator(); + while (it.hasNext()) { + ExprNodeGenericFuncDesc hiveExpr = HiveMetaStoreClientHelper.convertToHivePartitionExpr( + it.next(), partitionKeys, hudiTable.getName()); + if (hiveExpr != null) { + hivePredicates.add(hiveExpr); + } + } + int count = hivePredicates.size(); + // combine all predicate by `and` + // compoundExprs must have at least 2 predicates + if (count >= 2) { + hivePartitionPredicate = HiveMetaStoreClientHelper.getCompoundExpr(hivePredicates, "and"); + } else if (count == 1) { + // only one predicate + hivePartitionPredicate = (ExprNodeGenericFuncDesc) hivePredicates.get(0); + } else { + // have no predicate, make a dummy predicate "1=1" to get all partitions + HiveMetaStoreClientHelper.ExprBuilder exprBuilder = + new HiveMetaStoreClientHelper.ExprBuilder(hudiTable.getName()); + hivePartitionPredicate = exprBuilder.val(TypeInfoFactory.intTypeInfo, 1) + .val(TypeInfoFactory.intTypeInfo, 1) + .pred("=", 2).build(); + } + } + + private InputSplit[] getSplits() throws UserException, IOException { + String splitsPath = basePath; + if (partitionKeys.size() > 0) { + extractHivePartitionPredicate(); + + String metaStoreUris = hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS); + List hivePartitions = + HiveMetaStoreClientHelper.getHivePartitions(metaStoreUris, remoteHiveTable, hivePartitionPredicate); + splitsPath = hivePartitions.stream() + .map(x -> x.getSd().getLocation()).collect(Collectors.joining(",")); + } + + + Configuration configuration = new Configuration(); + InputFormat inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); + // alway get fileSplits from inputformat, + // because all hoodie input format have UseFileSplitsFromInputFormat annotation + JobConf jobConf = new JobConf(configuration); + FileInputFormat.setInputPaths(jobConf, splitsPath); + InputSplit[] inputSplits = inputFormat.getSplits(jobConf,0); + return inputSplits; + + } + + // If fileFormat is not null, we use fileFormat instead of check file's suffix + protected void buildScanRange() throws UserException, IOException { + scanRangeLocations = Lists.newArrayList(); + InputSplit[] inputSplits = getSplits(); + if (inputSplits.length == 0) { + return; + } + + THdfsParams tHdfsParams = new THdfsParams(); + String fullPath = ((FileSplit)inputSplits[0]).getPath().toUri().toString(); + String filePath = ((FileSplit)inputSplits[0]).getPath().toUri().getPath(); + String fsName = fullPath.replace(filePath, ""); + tHdfsParams.setFsName(fsName); + Log.info("Hudi path's host is " + fsName); + + TFileFormatType formatType = TFileFormatType.FORMAT_PARQUET; + ParamCreateContext context = getParamCreateContexts().get(0); + for(InputSplit split: inputSplits) { + FileSplit fileSplit = (FileSplit)split; + + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc); + List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), + getPartitionKeys()); + int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size(); + + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, formatType, + partitionValuesFromPath, numberOfColumnsFromFile, brokerDesc); + rangeDesc.setHdfsParams(tHdfsParams); + rangeDesc.setReadByColumnDef(true); + + curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); + Log.info("Assign to backend " + curLocations.getLocations().get(0).getBackendId() + + " with hudi split: " + fileSplit.getPath() + + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"); + + // Put the last file + if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) { + scanRangeLocations.add(curLocations); + } + } + } + + private TBrokerRangeDesc createBrokerRangeDesc(FileSplit fileSplit, + TFileFormatType formatType, + List columnsFromPath, int numberOfColumnsFromFile, + BrokerDesc brokerDesc) { + TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); + rangeDesc.setFileType(brokerDesc.getFileType()); + rangeDesc.setFormatType(formatType); + rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); + rangeDesc.setSplittable(true); + rangeDesc.setStartOffset(fileSplit.getStart()); + rangeDesc.setSize(fileSplit.getLength()); + rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile); + rangeDesc.setColumnsFromPath(columnsFromPath); + // set hdfs params for hdfs file type. + switch (brokerDesc.getFileType()) { + case FILE_HDFS: + BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc); + break; + } + return rangeDesc; + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + if (!isLoad()) { + output.append(prefix).append("TABLE: ").append(hudiTable.getName()).append("\n"); + output.append(prefix).append("PATH: ") + .append(hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS)).append("\n"); + } + return output.toString(); + } + + /** + * Analyze columns from path, the partition columns + */ + private void analyzeColumnFromPath() { + for (String colName : partitionKeys) { + ImportColumnDesc importColumnDesc = new ImportColumnDesc(colName, null); + parsedColumnExprList.add(importColumnDesc); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 8d125c12047126..468176b27cca05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1708,6 +1708,10 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode", null, -1); break; + case HUDI: + scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HudiScanNode", + null, -1); + break; default: break; } diff --git a/fe/pom.xml b/fe/pom.xml index 40406208cfcc75..7c1045346ee197 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -323,6 +323,12 @@ under the License. + + org.apache.hadoop + hadoop-client + 2.8.0 + compile + ${project.groupId} fe-common From 5a4dade465206021b6a24381a5fef754bb9bf92c Mon Sep 17 00:00:00 2001 From: dujunling Date: Tue, 24 May 2022 15:34:02 +0800 Subject: [PATCH 2/8] fix style --- .../org/apache/doris/analysis/Analyzer.java | 6 +- .../catalog/HiveMetaStoreClientHelper.java | 12 ++- .../doris/external/hive/util/HiveUtil.java | 92 ++++++++----------- .../apache/doris/external/hudi/HudiUtils.java | 12 ++- .../apache/doris/planner/HudiScanNode.java | 48 +++++----- .../doris/analysis/CreateTableStmtTest.java | 22 +++++ 6 files changed, 112 insertions(+), 80 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 7478a5290a3f2d..1dbe7fdb4c6eac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -619,7 +619,7 @@ public TableRef resolveTableRef(TableRef tableRef) throws AnalysisException { partition -> partition.getState() == PartitionState.RESTORE ).collect(Collectors.toList()).isEmpty(); - if(!isNotRestoring){ + if (!isNotRestoring) { // if doing restore with partitions, the status check push down to OlapScanNode::computePartitionInfo to // support query that partitions is not restoring. } else { @@ -628,9 +628,9 @@ public TableRef resolveTableRef(TableRef tableRef) throws AnalysisException { } } - if (table.getType() == TableType.HUDI && ((HudiTable) table).getFullSchema().isEmpty()) { + if (table.getType() == TableType.HUDI && table.getFullSchema().isEmpty()) { // resolve hudi table's schema when table schema is empty from doris meta - table = HudiUtils.resolveHudiTable((HudiTable)table); + table = HudiUtils.resolveHudiTable((HudiTable) table); } // tableName.getTbl() stores the table name specified by the user in the from statement. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index f6bbf8c9d30b62..9b75ecd4d2d93b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -167,7 +167,8 @@ public static void dropClient(HiveMetaStoreClient client) { * @throws DdlException */ public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate, - List fileStatuses, Table remoteHiveTbl) throws DdlException { + List fileStatuses, + Table remoteHiveTbl) throws DdlException { List> remoteIterators; if (remoteHiveTbl.getPartitionKeys().size() > 0) { String metaStoreUris = hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS); @@ -209,6 +210,15 @@ public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDe return hdfsUrl; } + /** + * list partitions from hiveMetaStore. + * + * @param metaStoreUris hiveMetaStore uris + * @param remoteHiveTbl Hive table + * @param hivePartitionPredicate filter when list partitions + * @return a list of hive partitions + * @throws DdlException when connect hiveMetaStore failed. + */ public static List getHivePartitions(String metaStoreUris, Table remoteHiveTbl, ExprNodeGenericFuncDesc hivePartitionPredicate) throws DdlException { List hivePartitions = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java index a841ffc9187b02..03d0d5ebec050a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java @@ -17,14 +17,13 @@ package org.apache.doris.external.hive.util; -import com.google.common.collect.Lists; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.UserException; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; @@ -40,49 +39,29 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.DateTimeFormatterBuilder; -import org.joda.time.format.DateTimeParser; -import org.joda.time.format.DateTimePrinter; -import org.joda.time.format.ISODateTimeFormat; - -import java.lang.reflect.Field; + import java.util.List; +/** + * Hive util for create or query hive table. + */ public final class HiveUtil { private static final Logger LOG = LogManager.getLogger(HiveUtil.class); - private static final DateTimeFormatter HIVE_DATE_PARSER = ISODateTimeFormat.date().withZoneUTC(); - private static final DateTimeFormatter HIVE_TIMESTAMP_PARSER; - private static final Field COMPRESSION_CODECS_FIELD; - - static { - DateTimeParser[] timestampWithoutTimeZoneParser = { - DateTimeFormat.forPattern("yyyy-M-d").getParser(), - DateTimeFormat.forPattern("yyyy-M-d H:m").getParser(), - DateTimeFormat.forPattern("yyyy-M-d H:m:s").getParser(), - DateTimeFormat.forPattern("yyyy-M-d H:m:s.SSS").getParser(), - DateTimeFormat.forPattern("yyyy-M-d H:m:s.SSSSSSS").getParser(), - DateTimeFormat.forPattern("yyyy-M-d H:m:s.SSSSSSSSS").getParser(), - }; - DateTimePrinter timestampWithoutTimeZonePrinter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getPrinter(); - HIVE_TIMESTAMP_PARSER = new DateTimeFormatterBuilder().append(timestampWithoutTimeZonePrinter, timestampWithoutTimeZoneParser).toFormatter().withZoneUTC(); - - try { - COMPRESSION_CODECS_FIELD = TextInputFormat.class.getDeclaredField("compressionCodecs"); - COMPRESSION_CODECS_FIELD.setAccessible(true); - } - catch (ReflectiveOperationException e) { - throw new AssertionError(e); - } - } - - private HiveUtil() - { + private HiveUtil() { } - public static InputFormat getInputFormat(Configuration configuration, String inputFormatName, boolean symlinkTarget) throws UserException { + /** + * get input format class from inputFormatName. + * + * @param configuration jobConf used when getInputFormatClass + * @param inputFormatName inputFormat class name + * @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat + * @return a class of inputFormat. + * @throws UserException when class not found. + */ + public static InputFormat getInputFormat(Configuration configuration, + String inputFormatName, boolean symlinkTarget) throws UserException { try { JobConf jobConf = new JobConf(configuration); @@ -93,19 +72,17 @@ private HiveUtil() } return ReflectionUtils.newInstance(inputFormatClass, jobConf); - } - catch (ClassNotFoundException | RuntimeException e) { + } catch (ClassNotFoundException | RuntimeException e) { throw new UserException("Unable to create input format " + inputFormatName, e); } } @SuppressWarnings({"unchecked", "RedundantCast"}) private static Class> getInputFormatClass(JobConf conf, String inputFormatName) - throws ClassNotFoundException - { + throws ClassNotFoundException { // CDH uses different names for Parquet - if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName) || - "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) { + if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName) + || "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) { return MapredParquetInputFormat.class; } @@ -113,23 +90,33 @@ private HiveUtil() return (Class>) clazz.asSubclass(InputFormat.class); } + /** + * transform hiveSchema to Doris schema. + * + * @param hiveSchema hive schema + * @return doris schema + * @throws AnalysisException when transform failed. + */ public static List transformHiveSchema(List hiveSchema) throws AnalysisException { List newSchema = Lists.newArrayList(); for (FieldSchema hiveColumn : hiveSchema) { try { newSchema.add(HiveUtil.transformHiveField(hiveColumn)); } catch (UnsupportedOperationException e) { - if (Config.iceberg_table_creation_strict_mode) { - throw e; - } LOG.warn("Unsupported data type in Doris, ignore column[{}], with error: {}", hiveColumn.getName(), e.getMessage()); - continue; + throw e; } } return newSchema; } + /** + * tranform hive field to doris column. + * + * @param field hive field to be transformed + * @return doris column + */ public static Column transformHiveField(FieldSchema field) { TypeInfo hiveTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(field.getType()); Type type = convertHiveTypeToiveDoris(hiveTypeInfo); @@ -140,8 +127,8 @@ private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) { switch (hiveTypeInfo.getCategory()) { case PRIMITIVE: { - PrimitiveTypeInfo pTypeInfo = (PrimitiveTypeInfo) hiveTypeInfo; - switch (pTypeInfo.getPrimitiveCategory()) { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) hiveTypeInfo; + switch (primitiveTypeInfo.getPrimitiveCategory()) { case VOID: return Type.NULL; case BOOLEAN: @@ -171,11 +158,12 @@ private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) { case DECIMAL: return Type.DECIMALV2; default: - throw new UnsupportedOperationException("Unsupported type: " + pTypeInfo.getPrimitiveCategory()); + throw new UnsupportedOperationException("Unsupported type: " + + primitiveTypeInfo.getPrimitiveCategory()); } } case LIST: - TypeInfo elementTypeInfo = ((ListTypeInfo)hiveTypeInfo) + TypeInfo elementTypeInfo = ((ListTypeInfo) hiveTypeInfo) .getListElementTypeInfo(); Type newType = null; if (elementTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java index 024b3d0a1fe307..c1c55b946058a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java @@ -21,14 +21,13 @@ import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.external.hive.util.HiveUtil; import com.google.common.base.Strings; import com.google.common.collect.Maps; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.external.hive.util.HiveUtil; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -138,6 +137,13 @@ public static void validateColumns(HudiTable table, } } + /** + * resolve hudi table from hive metaStore. + * + * @param table a doris hudi table + * @return a doris hudi table which has been resolved. + * @throws AnalysisException when remoteTable is not exist or not a hudi table + */ public static HudiTable resolveHudiTable(HudiTable table) throws AnalysisException { String metastoreUris = table.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS); org.apache.hadoop.hive.metastore.api.Table remoteHiveTable = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java index d1bd3cf672fac9..c11974be9d6c80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java @@ -17,8 +17,6 @@ package org.apache.doris.planner; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.Expr; @@ -43,6 +41,9 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; @@ -66,6 +67,9 @@ import java.util.Map; import java.util.stream.Collectors; +/** + * Hudi scan node to query hudi table. + */ public class HudiScanNode extends BrokerScanNode { private static final Logger LOG = LogManager.getLogger(HudiScanNode.class); @@ -87,6 +91,12 @@ public class HudiScanNode extends BrokerScanNode { private List scanRangeLocations; + public HudiScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName, + List> fileStatusesList, int filesAdded) { + super(id, destTupleDesc, planNodeName, fileStatusesList, filesAdded); + this.hudiTable = (HudiTable) destTupleDesc.getTable(); + } + public String getHdfsUri() { return hdfsUri; } @@ -107,20 +117,14 @@ public List getPartitionKeys() { return partitionKeys; } - public HudiScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName, - List> fileStatusesList, int filesAdded) { - super(id, destTupleDesc, planNodeName, fileStatusesList, filesAdded); - this.hudiTable = (HudiTable) destTupleDesc.getTable(); - } - /** * super init will invoke initFileGroup, In initFileGroup will do * 1, get hudi table from hive metastore * 2, resolve hudi table type, query mode, table base path, partition columns information. * 3. generate fileGroup * - * @param analyzer - * @throws UserException + * @param analyzer analyzer + * @throws UserException when init failed. */ @Override public void init(Analyzer analyzer) throws UserException { @@ -153,7 +157,7 @@ protected void initFileGroup() throws UserException { } /** - * Override this function just for skip parent's getFileStatus + * Override this function just for skip parent's getFileStatus. */ @Override protected void getFileStatus() throws DdlException { @@ -224,7 +228,7 @@ private void initParams(Analyzer analyzer) { /** - * Extracts partition predicate from SelectStmt.whereClause that can be pushed down to Hive + * Extracts partition predicate from SelectStmt.whereClause that can be pushed down to Hive. */ private void extractHivePartitionPredicate() throws DdlException { ListIterator it = conjuncts.listIterator(); @@ -272,7 +276,7 @@ private InputSplit[] getSplits() throws UserException, IOException { // because all hoodie input format have UseFileSplitsFromInputFormat annotation JobConf jobConf = new JobConf(configuration); FileInputFormat.setInputPaths(jobConf, splitsPath); - InputSplit[] inputSplits = inputFormat.getSplits(jobConf,0); + InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 0); return inputSplits; } @@ -285,17 +289,17 @@ protected void buildScanRange() throws UserException, IOException { return; } - THdfsParams tHdfsParams = new THdfsParams(); - String fullPath = ((FileSplit)inputSplits[0]).getPath().toUri().toString(); - String filePath = ((FileSplit)inputSplits[0]).getPath().toUri().getPath(); + THdfsParams hdfsParams = new THdfsParams(); + String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString(); + String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); String fsName = fullPath.replace(filePath, ""); - tHdfsParams.setFsName(fsName); + hdfsParams.setFsName(fsName); Log.info("Hudi path's host is " + fsName); TFileFormatType formatType = TFileFormatType.FORMAT_PARQUET; ParamCreateContext context = getParamCreateContexts().get(0); - for(InputSplit split: inputSplits) { - FileSplit fileSplit = (FileSplit)split; + for (InputSplit split : inputSplits) { + FileSplit fileSplit = (FileSplit) split; TScanRangeLocations curLocations = newLocations(context.params, brokerDesc); List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), @@ -304,7 +308,7 @@ protected void buildScanRange() throws UserException, IOException { TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, formatType, partitionValuesFromPath, numberOfColumnsFromFile, brokerDesc); - rangeDesc.setHdfsParams(tHdfsParams); + rangeDesc.setHdfsParams(hdfsParams); rangeDesc.setReadByColumnDef(true); curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); @@ -337,6 +341,8 @@ private TBrokerRangeDesc createBrokerRangeDesc(FileSplit fileSplit, case FILE_HDFS: BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc); break; + default: + break; } return rangeDesc; } @@ -353,7 +359,7 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { } /** - * Analyze columns from path, the partition columns + * Analyze columns from path, the partition columns. */ private void analyzeColumnFromPath() { for (String colName : partitionKeys) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java index a9ce32b84ee176..70e9fdc7df1294 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java @@ -291,4 +291,26 @@ public void testCreateHudiTable() throws UserException { + "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" + "\"hudi.table\" = \"test\")", stmt.toString()); } + + @Test + public void testCreateHudiTableWithSchema() throws UserException { + Map properties = new HashMap<>(); + properties.put("hudi.database", "doris"); + properties.put("hudi.table", "test"); + properties.put("hudi.hive.metastore.uris", "thrift://127.0.0.1:9087"); + CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "hudi", properties, ""); + ColumnDef idCol = new ColumnDef("id", TypeDef.create(PrimitiveType.INT)); + stmt.addColumnDef(idCol); + ColumnDef nameCol = new ColumnDef("name", TypeDef.create(PrimitiveType.STRING)); + stmt.addColumnDef(nameCol); + stmt.analyze(analyzer); + + Assert.assertEquals("CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n" + + "id int, \n" + + "name string\n" + + ") ENGINE = hudi\n" + + "PROPERTIES (\"hudi.database\" = \"doris\",\n" + + "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" + + "\"hudi.table\" = \"test\")", stmt.toString()); + } } From 2cb1bba534862ed4389d540abc9609d18816ff64 Mon Sep 17 00:00:00 2001 From: dujunling Date: Tue, 24 May 2022 16:16:54 +0800 Subject: [PATCH 3/8] refractor brokerFileGroup --- .../apache/doris/load/BrokerFileGroup.java | 32 ++++--------------- .../apache/doris/planner/HiveScanNode.java | 2 +- .../apache/doris/planner/IcebergScanNode.java | 5 ++- 3 files changed, 11 insertions(+), 28 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 3b5b9f08d6367e..d315776b45e9f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -27,7 +27,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.HiveTable; -import org.apache.doris.catalog.IcebergTable; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; @@ -37,7 +36,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; -import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.loadv2.LoadTask; @@ -120,31 +118,13 @@ public BrokerFileGroup(BrokerTable table) throws AnalysisException { this.fileFormat = table.getFileFormat(); } - // Used for hive table, no need to parse - public BrokerFileGroup(HiveTable table, - String columnSeparator, - String lineDelimiter, + /** + * Should used for hive/iceberg/hudi external table. + */ + public BrokerFileGroup(long tableId, String filePath, - String fileFormat, - List columnsFromPath, - List columnExprList) throws AnalysisException { - this.tableId = table.getId(); - this.valueSeparator = Separator.convertSeparator(columnSeparator); - this.lineDelimiter = Separator.convertSeparator(lineDelimiter); - this.isNegative = false; - this.filePaths = Lists.newArrayList(filePath); - this.fileFormat = fileFormat; - this.columnsFromPath = columnsFromPath; - this.columnExprList = columnExprList; - } - - // Used for iceberg table, no need to parse - public BrokerFileGroup(IcebergTable table) throws UserException { - this.tableId = table.getId(); - this.isNegative = false; - this.valueSeparator = "|"; - this.lineDelimiter = "\n"; - this.fileFormat = table.getFileFormat(); + String fileFormat) throws AnalysisException { + this(tableId, "|", "\n", filePath, fileFormat, null, null); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java index 980701e6ca5426..078c6d97bf601d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java @@ -115,7 +115,7 @@ protected void initFileGroup() throws UserException { HiveTable hiveTable = (HiveTable) desc.getTable(); fileGroups = Lists.newArrayList( - new BrokerFileGroup(hiveTable, + new BrokerFileGroup(hiveTable.getId(), getColumnSeparator(), getLineDelimiter(), getPath(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java index 6bdda5272e72a8..4af73caf5105f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java @@ -57,7 +57,10 @@ public void init(Analyzer analyzer) throws UserException { @Override protected void initFileGroup() throws UserException { - fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable)); + fileGroups = Lists.newArrayList( + new BrokerFileGroup(icebergTable.getId(), + null, + icebergTable.getFileFormat())); brokerDesc = new BrokerDesc("IcebergTableDesc", icebergTable.getStorageType(), icebergTable.getIcebergProperties()); targetTable = icebergTable; From f0b44a1537c19eab74f18459f55891aa216d309f Mon Sep 17 00:00:00 2001 From: dujunling Date: Tue, 24 May 2022 17:27:18 +0800 Subject: [PATCH 4/8] set string column as non-key column --- .../org/apache/doris/analysis/CreateTableStmtTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java index 70e9fdc7df1294..badb0d5df2b12f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java @@ -301,13 +301,14 @@ public void testCreateHudiTableWithSchema() throws UserException { CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "hudi", properties, ""); ColumnDef idCol = new ColumnDef("id", TypeDef.create(PrimitiveType.INT)); stmt.addColumnDef(idCol); - ColumnDef nameCol = new ColumnDef("name", TypeDef.create(PrimitiveType.STRING)); + ColumnDef nameCol = new ColumnDef("name", TypeDef.create(PrimitiveType.STRING), false, + null, true, ColumnDef.DefaultValue.NOT_SET, ""); stmt.addColumnDef(nameCol); stmt.analyze(analyzer); Assert.assertEquals("CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n" - + "id int, \n" - + "name string\n" + + " `id` int(11) NULL COMMENT \"\",\n" + + " `name` text NULL COMMENT \"\"\n" + ") ENGINE = hudi\n" + "PROPERTIES (\"hudi.database\" = \"doris\",\n" + "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" From d75db54b4dd645b78cc5a9340a8fb4e1b0cc8b75 Mon Sep 17 00:00:00 2001 From: dujunling Date: Wed, 25 May 2022 11:34:09 +0800 Subject: [PATCH 5/8] set src column type as varchar because parquet reader convert column as string after scan in be. --- .../main/java/org/apache/doris/planner/HudiScanNode.java | 6 ++++-- .../java/org/apache/doris/analysis/CreateTableStmtTest.java | 6 +++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java index c11974be9d6c80..a6fbabbc3253b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java @@ -26,6 +26,8 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; @@ -216,10 +218,10 @@ private void initParams(Analyzer analyzer) { // init slot desc add expr map, also transform hadoop functions for (Column column : columns) { SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); - slotDesc.setType(column.getType()); + slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); slotDesc.setIsMaterialized(true); slotDesc.setIsNullable(true); - slotDesc.setColumn(column); + slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR)); params.addToSrcSlotIds(slotDesc.getId().asInt()); slotDescByName.put(column.getName(), slotDesc); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java index badb0d5df2b12f..064593e3cbbc9a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java @@ -301,14 +301,14 @@ public void testCreateHudiTableWithSchema() throws UserException { CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "hudi", properties, ""); ColumnDef idCol = new ColumnDef("id", TypeDef.create(PrimitiveType.INT)); stmt.addColumnDef(idCol); - ColumnDef nameCol = new ColumnDef("name", TypeDef.create(PrimitiveType.STRING), false, + ColumnDef nameCol = new ColumnDef("name", TypeDef.create(PrimitiveType.INT), false, null, true, ColumnDef.DefaultValue.NOT_SET, ""); stmt.addColumnDef(nameCol); stmt.analyze(analyzer); Assert.assertEquals("CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n" - + " `id` int(11) NULL COMMENT \"\",\n" - + " `name` text NULL COMMENT \"\"\n" + + " `id` int(11) NOT NULL COMMENT \"\",\n" + + " `name` int(11) NULL COMMENT \"\"\n" + ") ENGINE = hudi\n" + "PROPERTIES (\"hudi.database\" = \"doris\",\n" + "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" From d562342da2fefc75fa7b95dd2d5d522d59d00c07 Mon Sep 17 00:00:00 2001 From: dujunling Date: Thu, 26 May 2022 19:52:45 +0800 Subject: [PATCH 6/8] fileformat --- .../java/org/apache/doris/catalog/Catalog.java | 3 +-- .../org/apache/doris/planner/HudiScanNode.java | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 1952689d27bed1..525d35f7ac3a6f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4143,11 +4143,10 @@ private void createHudiTable(Database db, CreateTableStmt stmt) throws DdlExcept HudiUtils.validateColumns(hudiTable, hiveTable); } switch (hiveTable.getTableType()) { - case "VIRTUAL_VIEW": - throw new DdlException("VIRTUAL_VIEW table is not supported."); case "EXTERNAL_TABLE": case "MANAGED_TABLE": break; + case "VIRTUAL_VIEW": default: throw new DdlException("unsupported hudi table type [" + hiveTable.getTableType() + "]."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java index a6fbabbc3253b4..756441f3964cc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java @@ -66,6 +66,7 @@ import java.util.ArrayList; import java.util.List; import java.util.ListIterator; +import java.util.Locale; import java.util.Map; import java.util.stream.Collectors; @@ -296,9 +297,17 @@ protected void buildScanRange() throws UserException, IOException { String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); String fsName = fullPath.replace(filePath, ""); hdfsParams.setFsName(fsName); - Log.info("Hudi path's host is " + fsName); + Log.debug("Hudi path's host is " + fsName); + + TFileFormatType formatType = null; + if (this.inputFormatName.toUpperCase(Locale.ROOT).contains("parquet")) { + formatType = TFileFormatType.FORMAT_PARQUET; + } else if (this.inputFormatName.toUpperCase(Locale.ROOT).contains("orc")) { + formatType = TFileFormatType.FORMAT_ORC; + } else { + throw new UserException("unsupported hudi table type [" + this.inputFormatName + "]."); + } - TFileFormatType formatType = TFileFormatType.FORMAT_PARQUET; ParamCreateContext context = getParamCreateContexts().get(0); for (InputSplit split : inputSplits) { FileSplit fileSplit = (FileSplit) split; @@ -314,7 +323,7 @@ protected void buildScanRange() throws UserException, IOException { rangeDesc.setReadByColumnDef(true); curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); - Log.info("Assign to backend " + curLocations.getLocations().get(0).getBackendId() + Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with hudi split: " + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"); From 1771753a2907fea0da02eb378e6ba8ad16554f46 Mon Sep 17 00:00:00 2001 From: dujunling Date: Fri, 27 May 2022 10:48:13 +0800 Subject: [PATCH 7/8] do not change database's table cache --- .../java/org/apache/doris/external/hudi/HudiUtils.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java index c1c55b946058a0..d7948bf1d2d0c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java @@ -166,9 +166,11 @@ public static HudiTable resolveHudiTable(HudiTable table) throws AnalysisExcepti } List newSchema = HiveUtil.transformHiveSchema(remoteHiveTable.getSd().getCols()); - table.setNewFullSchema(newSchema); - return table; - + HudiTable tableWithSchema = new HudiTable(table.getId(), + table.getName(), + newSchema, + table.getTableProperties()); + return tableWithSchema; } From f129d01843500781a585d955d3863e39401cc968 Mon Sep 17 00:00:00 2001 From: dujunling Date: Fri, 27 May 2022 14:01:13 +0800 Subject: [PATCH 8/8] style --- .../main/java/org/apache/doris/external/hudi/HudiUtils.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java index d7948bf1d2d0c4..757fdc2d45ca9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java @@ -167,9 +167,9 @@ public static HudiTable resolveHudiTable(HudiTable table) throws AnalysisExcepti List newSchema = HiveUtil.transformHiveSchema(remoteHiveTable.getSd().getCols()); HudiTable tableWithSchema = new HudiTable(table.getId(), - table.getName(), - newSchema, - table.getTableProperties()); + table.getName(), + newSchema, + table.getTableProperties()); return tableWithSchema; }