diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 009cf9df9043af..58e6a8e5497ca8 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -68,6 +68,13 @@ under the License. pom import + + + org.apache.hadoop + hadoop-mapreduce-client + 2.7.4 + compile + diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index a47e8a9b5e9b9d..1dbe7fdb4c6eac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -38,6 +38,8 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.VecNotImplException; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.external.hudi.HudiTable; +import org.apache.doris.external.hudi.HudiUtils; import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.qe.ConnectContext; @@ -617,7 +619,7 @@ public TableRef resolveTableRef(TableRef tableRef) throws AnalysisException { partition -> partition.getState() == PartitionState.RESTORE ).collect(Collectors.toList()).isEmpty(); - if(!isNotRestoring){ + if (!isNotRestoring) { // if doing restore with partitions, the status check push down to OlapScanNode::computePartitionInfo to // support query that partitions is not restoring. } else { @@ -626,6 +628,11 @@ public TableRef resolveTableRef(TableRef tableRef) throws AnalysisException { } } + if (table.getType() == TableType.HUDI && table.getFullSchema().isEmpty()) { + // resolve hudi table's schema when table schema is empty from doris meta + table = HudiUtils.resolveHudiTable((HudiTable) table); + } + // tableName.getTbl() stores the table name specified by the user in the from statement. // In the case of case-sensitive table names, the value of tableName.getTbl() is the same as table.getName(). // However, since the system view is not case-sensitive, table.getName() gets the lowercase view name, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 69fe06925830d2..525d35f7ac3a6f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4142,6 +4142,14 @@ private void createHudiTable(Database db, CreateTableStmt stmt) throws DdlExcept if (!hudiTable.getFullSchema().isEmpty()) { HudiUtils.validateColumns(hudiTable, hiveTable); } + switch (hiveTable.getTableType()) { + case "EXTERNAL_TABLE": + case "MANAGED_TABLE": + break; + case "VIRTUAL_VIEW": + default: + throw new DdlException("unsupported hudi table type [" + hiveTable.getTableType() + "]."); + } // check hive table if exists in doris database if (!db.createTableWithLock(hudiTable, false, stmt.isSetIfNotExists()).first) { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 53e27dfc6c127b..9b75ecd4d2d93b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -158,7 +158,7 @@ public static void dropClient(HiveMetaStoreClient client) { } /** - * Get data files of partitions in hive table, filter by partition predicate + * Get data files of partitions in hive table, filter by partition predicate. * @param hiveTable * @param hivePartitionPredicate * @param fileStatuses @@ -167,22 +167,13 @@ public static void dropClient(HiveMetaStoreClient client) { * @throws DdlException */ public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate, - List fileStatuses, Table remoteHiveTbl) throws DdlException { - HiveMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS)); - + List fileStatuses, + Table remoteHiveTbl) throws DdlException { List> remoteIterators; if (remoteHiveTbl.getPartitionKeys().size() > 0) { + String metaStoreUris = hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS); // hive partitioned table, get file iterator from table partition sd info - List hivePartitions = new ArrayList<>(); - try { - client.listPartitionsByExpr(hiveTable.getHiveDb(), hiveTable.getHiveTable(), - SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null, (short) -1, hivePartitions); - } catch (TException e) { - LOG.warn("Hive metastore thrift exception: {}", e.getMessage()); - throw new DdlException("Connect hive metastore failed. Error: " + e.getMessage()); - } finally { - client.close(); - } + List hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl, hivePartitionPredicate); remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties()); } else { // hive non-partitioned table, get file iterator from table sd info @@ -219,6 +210,32 @@ public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDe return hdfsUrl; } + /** + * list partitions from hiveMetaStore. + * + * @param metaStoreUris hiveMetaStore uris + * @param remoteHiveTbl Hive table + * @param hivePartitionPredicate filter when list partitions + * @return a list of hive partitions + * @throws DdlException when connect hiveMetaStore failed. + */ + public static List getHivePartitions(String metaStoreUris, Table remoteHiveTbl, + ExprNodeGenericFuncDesc hivePartitionPredicate) throws DdlException { + List hivePartitions = new ArrayList<>(); + HiveMetaStoreClient client = getClient(metaStoreUris); + try { + client.listPartitionsByExpr(remoteHiveTbl.getDbName(), remoteHiveTbl.getTableName(), + SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), + null, (short) -1, hivePartitions); + } catch (TException e) { + LOG.warn("Hive metastore thrift exception: {}", e.getMessage()); + throw new DdlException("Connect hive metastore failed."); + } finally { + client.close(); + } + return hivePartitions; + } + private static List> getRemoteIterator(List partitions, Map properties) throws DdlException { List> iterators = new ArrayList<>(); Configuration configuration = new Configuration(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java new file mode 100644 index 00000000000000..03d0d5ebec050a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.external.hive.util; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +/** + * Hive util for create or query hive table. + */ +public final class HiveUtil { + private static final Logger LOG = LogManager.getLogger(HiveUtil.class); + + private HiveUtil() { + } + + /** + * get input format class from inputFormatName. + * + * @param configuration jobConf used when getInputFormatClass + * @param inputFormatName inputFormat class name + * @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat + * @return a class of inputFormat. + * @throws UserException when class not found. + */ + public static InputFormat getInputFormat(Configuration configuration, + String inputFormatName, boolean symlinkTarget) throws UserException { + try { + JobConf jobConf = new JobConf(configuration); + + Class> inputFormatClass = getInputFormatClass(jobConf, inputFormatName); + if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) { + // symlink targets are always TextInputFormat + inputFormatClass = TextInputFormat.class; + } + + return ReflectionUtils.newInstance(inputFormatClass, jobConf); + } catch (ClassNotFoundException | RuntimeException e) { + throw new UserException("Unable to create input format " + inputFormatName, e); + } + } + + @SuppressWarnings({"unchecked", "RedundantCast"}) + private static Class> getInputFormatClass(JobConf conf, String inputFormatName) + throws ClassNotFoundException { + // CDH uses different names for Parquet + if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName) + || "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) { + return MapredParquetInputFormat.class; + } + + Class clazz = conf.getClassByName(inputFormatName); + return (Class>) clazz.asSubclass(InputFormat.class); + } + + /** + * transform hiveSchema to Doris schema. + * + * @param hiveSchema hive schema + * @return doris schema + * @throws AnalysisException when transform failed. + */ + public static List transformHiveSchema(List hiveSchema) throws AnalysisException { + List newSchema = Lists.newArrayList(); + for (FieldSchema hiveColumn : hiveSchema) { + try { + newSchema.add(HiveUtil.transformHiveField(hiveColumn)); + } catch (UnsupportedOperationException e) { + LOG.warn("Unsupported data type in Doris, ignore column[{}], with error: {}", + hiveColumn.getName(), e.getMessage()); + throw e; + } + } + return newSchema; + } + + /** + * tranform hive field to doris column. + * + * @param field hive field to be transformed + * @return doris column + */ + public static Column transformHiveField(FieldSchema field) { + TypeInfo hiveTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(field.getType()); + Type type = convertHiveTypeToiveDoris(hiveTypeInfo); + return new Column(field.getName(), type, false, null, true, null, field.getComment()); + } + + private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) { + + switch (hiveTypeInfo.getCategory()) { + case PRIMITIVE: { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) hiveTypeInfo; + switch (primitiveTypeInfo.getPrimitiveCategory()) { + case VOID: + return Type.NULL; + case BOOLEAN: + return Type.BOOLEAN; + case BYTE: + return Type.TINYINT; + case SHORT: + return Type.SMALLINT; + case INT: + return Type.INT; + case LONG: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case STRING: + return Type.STRING; + case CHAR: + return Type.CHAR; + case VARCHAR: + return Type.VARCHAR; + case DATE: + return Type.DATE; + case TIMESTAMP: + return Type.DATETIME; + case DECIMAL: + return Type.DECIMALV2; + default: + throw new UnsupportedOperationException("Unsupported type: " + + primitiveTypeInfo.getPrimitiveCategory()); + } + } + case LIST: + TypeInfo elementTypeInfo = ((ListTypeInfo) hiveTypeInfo) + .getListElementTypeInfo(); + Type newType = null; + if (elementTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) { + newType = convertHiveTypeToiveDoris(elementTypeInfo); + } else { + throw new UnsupportedOperationException("Unsupported type: " + hiveTypeInfo.toString()); + } + return new ArrayType(newType); + case MAP: + case STRUCT: + case UNION: + default: + throw new UnsupportedOperationException("Unsupported type: " + hiveTypeInfo.toString()); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java index 0e8e350eb33058..cfc491e35e478e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java @@ -117,7 +117,7 @@ public TTableDescriptor toThrift() { thriftHudiTable.setTableName(getHmsTableName()); thriftHudiTable.setProperties(getTableProperties()); - TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.HUDI_TABLE, + TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, fullSchema.size(), 0, getName(), ""); thriftTableDescriptor.setHudiTable(thriftHudiTable); return thriftTableDescriptor; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java index b1a65ad41d4c21..757fdc2d45ca9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiUtils.java @@ -17,21 +17,31 @@ package org.apache.doris.external.hudi; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.external.hive.util.HiveUtil; import com.google.common.base.Strings; import com.google.common.collect.Maps; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Hudi utils. */ public class HudiUtils { + private static final Logger LOG = LogManager.getLogger(HudiUtils.class); private static final String PROPERTY_MISSING_MSG = "Hudi table %s is null. Please add properties('%s'='xxx') when create table"; @@ -117,12 +127,53 @@ public static void validateColumns(HudiTable table, Set hudiColumnNames = table.getFullSchema().stream() .map(x -> x.getName()).collect(Collectors.toSet()); - Set hiveTableColumnNames = hiveTable.getSd().getCols() - .stream().map(x -> x.getName()).collect(Collectors.toSet()); + Set hiveTableColumnNames = + Stream.concat(hiveTable.getSd().getCols().stream(), hiveTable.getPartitionKeys().stream()) + .map(x -> x.getName()).collect(Collectors.toSet()); hudiColumnNames.removeAll(hiveTableColumnNames); if (hudiColumnNames.size() > 0) { throw new DdlException(String.format("Hudi table's column(s): {%s} didn't exist in hive table. ", String.join(", ", hudiColumnNames))); } } + + /** + * resolve hudi table from hive metaStore. + * + * @param table a doris hudi table + * @return a doris hudi table which has been resolved. + * @throws AnalysisException when remoteTable is not exist or not a hudi table + */ + public static HudiTable resolveHudiTable(HudiTable table) throws AnalysisException { + String metastoreUris = table.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS); + org.apache.hadoop.hive.metastore.api.Table remoteHiveTable = null; + try { + remoteHiveTable = HiveMetaStoreClientHelper.getTable( + table.getHmsDatabaseName(), + table.getHmsTableName(), + metastoreUris); + } catch (DdlException e) { + LOG.error("Failed to get table from HiveMetaStore", e); + throw new AnalysisException(ErrorCode.ERR_UNKNOWN_ERROR.formatErrorMsg()); + } + if (remoteHiveTable == null) { + throw new AnalysisException(ErrorCode.ERR_UNKNOWN_TABLE.formatErrorMsg(table.getHmsTableName(), + "HiveMetaStore")); + } + if (!HudiUtils.isHudiTable(remoteHiveTable)) { + throw new AnalysisException(ErrorCode.ERR_UNKNOWN_TABLE.formatErrorMsg(table.getHmsTableName(), + "HiveMetaStore")); + } + + List newSchema = HiveUtil.transformHiveSchema(remoteHiveTable.getSd().getCols()); + HudiTable tableWithSchema = new HudiTable(table.getId(), + table.getName(), + newSchema, + table.getTableProperties()); + return tableWithSchema; + } + + + + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java index cc41ab22ff5111..d315776b45e9f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -27,7 +27,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.HiveTable; -import org.apache.doris.catalog.IcebergTable; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; @@ -37,7 +36,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; -import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.loadv2.LoadTask; @@ -76,6 +74,7 @@ public class BrokerFileGroup implements Writable { private List fileSize; private List fileFieldNames; + // partition columnNames private List columnsFromPath; // columnExprList includes all fileFieldNames, columnsFromPath and column mappings // this param will be recreated by data desc when the log replay @@ -119,15 +118,26 @@ public BrokerFileGroup(BrokerTable table) throws AnalysisException { this.fileFormat = table.getFileFormat(); } - // Used for hive table, no need to parse - public BrokerFileGroup(HiveTable table, - String columnSeparator, - String lineDelimiter, + /** + * Should used for hive/iceberg/hudi external table. + */ + public BrokerFileGroup(long tableId, String filePath, - String fileFormat, - List columnsFromPath, - List columnExprList) throws AnalysisException { - this.tableId = table.getId(); + String fileFormat) throws AnalysisException { + this(tableId, "|", "\n", filePath, fileFormat, null, null); + } + + /** + * Should used for hive/iceberg/hudi external table. + */ + public BrokerFileGroup(long tableId, + String columnSeparator, + String lineDelimiter, + String filePath, + String fileFormat, + List columnsFromPath, + List columnExprList) throws AnalysisException { + this.tableId = tableId; this.valueSeparator = Separator.convertSeparator(columnSeparator); this.lineDelimiter = Separator.convertSeparator(lineDelimiter); this.isNegative = false; @@ -137,15 +147,6 @@ public BrokerFileGroup(HiveTable table, this.columnExprList = columnExprList; } - // Used for iceberg table, no need to parse - public BrokerFileGroup(IcebergTable table) throws UserException { - this.tableId = table.getId(); - this.isNegative = false; - this.valueSeparator = "|"; - this.lineDelimiter = "\n"; - this.fileFormat = table.getFileFormat(); - } - public BrokerFileGroup(DataDescription dataDescription) { this.fileFieldNames = dataDescription.getFileFieldNames(); this.columnsFromPath = dataDescription.getColumnsFromPath(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 59fada152ea9e1..43035967f94d8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -125,7 +125,7 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) { private Analyzer analyzer; - private static class ParamCreateContext { + protected static class ParamCreateContext { public BrokerFileGroup fileGroup; public TBrokerScanRangeParams params; public TupleDescriptor srcTupleDescriptor; @@ -173,6 +173,10 @@ public void init(Analyzer analyzer) throws UserException { } } + public List getParamCreateContexts() { + return paramCreateContexts; + } + protected void initFileGroup() throws UserException { BrokerTable brokerTable = (BrokerTable) desc.getTable(); try { @@ -267,13 +271,15 @@ private void initColumns(ParamCreateContext context) throws UserException { } } - Load.initColumns(targetTable, columnDescs, - context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, - context.srcTupleDescriptor, context.slotDescByName, context.params, - formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized()); + if (targetTable != null) { + Load.initColumns(targetTable, columnDescs, + context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, + context.srcTupleDescriptor, context.slotDescByName, context.params, + formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized()); + } } - private TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc) + protected TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc) throws UserException { Backend selectedBackend; @@ -322,10 +328,6 @@ private TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDe return locations; } - private TBrokerScanRange brokerScanRange(TScanRangeLocations locations) { - return locations.scan_range.broker_scan_range; - } - private void getFileStatusAndCalcInstance() throws UserException { if (fileStatusesList == null || filesAdded == -1) { // FIXME(cmy): fileStatusesList and filesAdded can be set out of db lock when doing pull load, @@ -336,7 +338,10 @@ private void getFileStatusAndCalcInstance() throws UserException { filesAdded = 0; this.getFileStatus(); } - Preconditions.checkState(fileStatusesList.size() == fileGroups.size()); + // In hudiScanNode, calculate scan range using its own way which do not need fileStatusesList + if (!(this instanceof HudiScanNode)) { + Preconditions.checkState(fileStatusesList.size() == fileGroups.size()); + } if (isLoad() && filesAdded == 0) { throw new UserException("No source file in this table(" + targetTable.getName() + ")."); @@ -503,7 +508,7 @@ private void processFileGroup( rangeDesc.setNumAsString(context.fileGroup.isNumAsString()); rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine()); } - brokerScanRange(curLocations).addToRanges(rangeDesc); + curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); curFileOffset += rangeBytes; } else { @@ -516,7 +521,7 @@ private void processFileGroup( } rangeDesc.setReadByColumnDef(true); - brokerScanRange(curLocations).addToRanges(rangeDesc); + curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); curFileOffset = 0; i++; } @@ -544,7 +549,7 @@ private void processFileGroup( } rangeDesc.setReadByColumnDef(true); - brokerScanRange(curLocations).addToRanges(rangeDesc); + curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; i++; @@ -552,7 +557,7 @@ private void processFileGroup( } // Put the last file - if (brokerScanRange(curLocations).isSetRanges()) { + if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) { locationsList.add(curLocations); } } @@ -568,7 +573,10 @@ private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileSt rangeDesc.setSplittable(fileStatus.isSplitable); rangeDesc.setStartOffset(curFileOffset); rangeDesc.setSize(rangeBytes); + // fileSize only be used when format is orc or parquet and TFileType is broker + // When TFileType is other type, it is not necessary rangeDesc.setFileSize(fileStatus.size); + // In Backend, will append columnsFromPath to the end of row after data scanned from file. rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile); rangeDesc.setColumnsFromPath(columnsFromPath); rangeDesc.setHeaderType(headerType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java index 980701e6ca5426..078c6d97bf601d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java @@ -115,7 +115,7 @@ protected void initFileGroup() throws UserException { HiveTable hiveTable = (HiveTable) desc.getTable(); fileGroups = Lists.newArrayList( - new BrokerFileGroup(hiveTable, + new BrokerFileGroup(hiveTable.getId(), getColumnSeparator(), getLineDelimiter(), getPath(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java new file mode 100644 index 00000000000000..756441f3964cc8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HudiScanNode.java @@ -0,0 +1,381 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.external.hive.util.HiveUtil; +import org.apache.doris.external.hudi.HudiProperty; +import org.apache.doris.external.hudi.HudiTable; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TBrokerRangeDesc; +import org.apache.doris.thrift.TBrokerScanRangeParams; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.THdfsParams; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.mortbay.log.Log; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Hudi scan node to query hudi table. + */ +public class HudiScanNode extends BrokerScanNode { + private static final Logger LOG = LogManager.getLogger(HudiScanNode.class); + + private HudiTable hudiTable; + // partition column predicates of hive table + private List hivePredicates = new ArrayList<>(); + private ExprNodeGenericFuncDesc hivePartitionPredicate; + private List parsedColumnExprList = new ArrayList<>(); + private String hdfsUri; + + private Table remoteHiveTable; + + /* hudi table properties */ + private String fileFormat; + private String inputFormatName; + private String basePath; + private List partitionKeys = new ArrayList<>(); + /* hudi table properties */ + + private List scanRangeLocations; + + public HudiScanNode(PlanNodeId id, TupleDescriptor destTupleDesc, String planNodeName, + List> fileStatusesList, int filesAdded) { + super(id, destTupleDesc, planNodeName, fileStatusesList, filesAdded); + this.hudiTable = (HudiTable) destTupleDesc.getTable(); + } + + public String getHdfsUri() { + return hdfsUri; + } + + public List getParsedColumnExprList() { + return parsedColumnExprList; + } + + public String getFileFormat() { + return fileFormat; + } + + public String getBasePath() { + return basePath; + } + + public List getPartitionKeys() { + return partitionKeys; + } + + /** + * super init will invoke initFileGroup, In initFileGroup will do + * 1, get hudi table from hive metastore + * 2, resolve hudi table type, query mode, table base path, partition columns information. + * 3. generate fileGroup + * + * @param analyzer analyzer + * @throws UserException when init failed. + */ + @Override + public void init(Analyzer analyzer) throws UserException { + super.init(analyzer); + // init scan range params + initParams(analyzer); + } + + @Override + public int getNumInstances() { + return scanRangeLocations.size(); + } + + @Override + protected void initFileGroup() throws UserException { + resolvHiveTable(); + analyzeColumnFromPath(); + + HudiTable hudiTable = (HudiTable) desc.getTable(); + fileGroups = Lists.newArrayList( + new BrokerFileGroup(hudiTable.getId(), + "\t", + "\n", + getBasePath(), + getFileFormat(), + getPartitionKeys(), + getParsedColumnExprList())); + brokerDesc = new BrokerDesc("HudiTableDesc", StorageBackend.StorageType.HDFS, hudiTable.getTableProperties()); + + } + + /** + * Override this function just for skip parent's getFileStatus. + */ + @Override + protected void getFileStatus() throws DdlException { + if (partitionKeys.size() > 0) { + extractHivePartitionPredicate(); + } + // set fileStatusesList as empty, we do not need fileStatusesList + fileStatusesList = Lists.newArrayList(); + filesAdded = 0; + } + + @Override + public void finalize(Analyzer analyzer) throws UserException { + try { + ParamCreateContext context = getParamCreateContexts().get(0); + finalizeParams(context.slotDescByName, context.exprMap, context.params, + context.srcTupleDescriptor, false, context.fileGroup.isNegative(), analyzer); + } catch (AnalysisException e) { + throw new UserException(e.getMessage()); + } + try { + buildScanRange(); + } catch (IOException e) { + LOG.error("Build scan range failed.", e); + throw new UserException("Build scan range failed.", e); + } + } + + @Override + public List getScanRangeLocations(long maxScanRangeLength) { + return scanRangeLocations; + } + + private void resolvHiveTable() throws DdlException { + this.remoteHiveTable = HiveMetaStoreClientHelper.getTable( + hudiTable.getHmsDatabaseName(), + hudiTable.getHmsTableName(), + hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS)); + + this.inputFormatName = remoteHiveTable.getSd().getInputFormat(); + this.fileFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(this.inputFormatName); + this.basePath = remoteHiveTable.getSd().getLocation(); + for (FieldSchema fieldSchema : remoteHiveTable.getPartitionKeys()) { + this.partitionKeys.add(fieldSchema.getName()); + } + Log.info("Hudi inputFileFormat is " + inputFormatName + ", basePath is " + this.basePath); + } + + private void initParams(Analyzer analyzer) { + ParamCreateContext context = getParamCreateContexts().get(0); + TBrokerScanRangeParams params = context.params; + + Map slotDescByName = Maps.newHashMap(); + + List columns = hudiTable.getBaseSchema(false); + // init slot desc add expr map, also transform hadoop functions + for (Column column : columns) { + SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(context.srcTupleDescriptor); + slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setIsMaterialized(true); + slotDesc.setIsNullable(true); + slotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR)); + params.addToSrcSlotIds(slotDesc.getId().asInt()); + slotDescByName.put(column.getName(), slotDesc); + } + context.slotDescByName = slotDescByName; + } + + + /** + * Extracts partition predicate from SelectStmt.whereClause that can be pushed down to Hive. + */ + private void extractHivePartitionPredicate() throws DdlException { + ListIterator it = conjuncts.listIterator(); + while (it.hasNext()) { + ExprNodeGenericFuncDesc hiveExpr = HiveMetaStoreClientHelper.convertToHivePartitionExpr( + it.next(), partitionKeys, hudiTable.getName()); + if (hiveExpr != null) { + hivePredicates.add(hiveExpr); + } + } + int count = hivePredicates.size(); + // combine all predicate by `and` + // compoundExprs must have at least 2 predicates + if (count >= 2) { + hivePartitionPredicate = HiveMetaStoreClientHelper.getCompoundExpr(hivePredicates, "and"); + } else if (count == 1) { + // only one predicate + hivePartitionPredicate = (ExprNodeGenericFuncDesc) hivePredicates.get(0); + } else { + // have no predicate, make a dummy predicate "1=1" to get all partitions + HiveMetaStoreClientHelper.ExprBuilder exprBuilder = + new HiveMetaStoreClientHelper.ExprBuilder(hudiTable.getName()); + hivePartitionPredicate = exprBuilder.val(TypeInfoFactory.intTypeInfo, 1) + .val(TypeInfoFactory.intTypeInfo, 1) + .pred("=", 2).build(); + } + } + + private InputSplit[] getSplits() throws UserException, IOException { + String splitsPath = basePath; + if (partitionKeys.size() > 0) { + extractHivePartitionPredicate(); + + String metaStoreUris = hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS); + List hivePartitions = + HiveMetaStoreClientHelper.getHivePartitions(metaStoreUris, remoteHiveTable, hivePartitionPredicate); + splitsPath = hivePartitions.stream() + .map(x -> x.getSd().getLocation()).collect(Collectors.joining(",")); + } + + + Configuration configuration = new Configuration(); + InputFormat inputFormat = HiveUtil.getInputFormat(configuration, inputFormatName, false); + // alway get fileSplits from inputformat, + // because all hoodie input format have UseFileSplitsFromInputFormat annotation + JobConf jobConf = new JobConf(configuration); + FileInputFormat.setInputPaths(jobConf, splitsPath); + InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 0); + return inputSplits; + + } + + // If fileFormat is not null, we use fileFormat instead of check file's suffix + protected void buildScanRange() throws UserException, IOException { + scanRangeLocations = Lists.newArrayList(); + InputSplit[] inputSplits = getSplits(); + if (inputSplits.length == 0) { + return; + } + + THdfsParams hdfsParams = new THdfsParams(); + String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString(); + String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); + String fsName = fullPath.replace(filePath, ""); + hdfsParams.setFsName(fsName); + Log.debug("Hudi path's host is " + fsName); + + TFileFormatType formatType = null; + if (this.inputFormatName.toUpperCase(Locale.ROOT).contains("parquet")) { + formatType = TFileFormatType.FORMAT_PARQUET; + } else if (this.inputFormatName.toUpperCase(Locale.ROOT).contains("orc")) { + formatType = TFileFormatType.FORMAT_ORC; + } else { + throw new UserException("unsupported hudi table type [" + this.inputFormatName + "]."); + } + + ParamCreateContext context = getParamCreateContexts().get(0); + for (InputSplit split : inputSplits) { + FileSplit fileSplit = (FileSplit) split; + + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc); + List partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), + getPartitionKeys()); + int numberOfColumnsFromFile = context.slotDescByName.size() - partitionValuesFromPath.size(); + + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(fileSplit, formatType, + partitionValuesFromPath, numberOfColumnsFromFile, brokerDesc); + rangeDesc.setHdfsParams(hdfsParams); + rangeDesc.setReadByColumnDef(true); + + curLocations.getScanRange().getBrokerScanRange().addToRanges(rangeDesc); + Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId() + + " with hudi split: " + fileSplit.getPath() + + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"); + + // Put the last file + if (curLocations.getScanRange().getBrokerScanRange().isSetRanges()) { + scanRangeLocations.add(curLocations); + } + } + } + + private TBrokerRangeDesc createBrokerRangeDesc(FileSplit fileSplit, + TFileFormatType formatType, + List columnsFromPath, int numberOfColumnsFromFile, + BrokerDesc brokerDesc) { + TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); + rangeDesc.setFileType(brokerDesc.getFileType()); + rangeDesc.setFormatType(formatType); + rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); + rangeDesc.setSplittable(true); + rangeDesc.setStartOffset(fileSplit.getStart()); + rangeDesc.setSize(fileSplit.getLength()); + rangeDesc.setNumOfColumnsFromFile(numberOfColumnsFromFile); + rangeDesc.setColumnsFromPath(columnsFromPath); + // set hdfs params for hdfs file type. + switch (brokerDesc.getFileType()) { + case FILE_HDFS: + BrokerUtil.generateHdfsParam(brokerDesc.getProperties(), rangeDesc); + break; + default: + break; + } + return rangeDesc; + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + if (!isLoad()) { + output.append(prefix).append("TABLE: ").append(hudiTable.getName()).append("\n"); + output.append(prefix).append("PATH: ") + .append(hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS)).append("\n"); + } + return output.toString(); + } + + /** + * Analyze columns from path, the partition columns. + */ + private void analyzeColumnFromPath() { + for (String colName : partitionKeys) { + ImportColumnDesc importColumnDesc = new ImportColumnDesc(colName, null); + parsedColumnExprList.add(importColumnDesc); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java index 6bdda5272e72a8..4af73caf5105f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java @@ -57,7 +57,10 @@ public void init(Analyzer analyzer) throws UserException { @Override protected void initFileGroup() throws UserException { - fileGroups = Lists.newArrayList(new BrokerFileGroup(icebergTable)); + fileGroups = Lists.newArrayList( + new BrokerFileGroup(icebergTable.getId(), + null, + icebergTable.getFileFormat())); brokerDesc = new BrokerDesc("IcebergTableDesc", icebergTable.getStorageType(), icebergTable.getIcebergProperties()); targetTable = icebergTable; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 8d125c12047126..468176b27cca05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1708,6 +1708,10 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "IcebergScanNode", null, -1); break; + case HUDI: + scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "HudiScanNode", + null, -1); + break; default: break; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java index a9ce32b84ee176..064593e3cbbc9a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java @@ -291,4 +291,27 @@ public void testCreateHudiTable() throws UserException { + "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" + "\"hudi.table\" = \"test\")", stmt.toString()); } + + @Test + public void testCreateHudiTableWithSchema() throws UserException { + Map properties = new HashMap<>(); + properties.put("hudi.database", "doris"); + properties.put("hudi.table", "test"); + properties.put("hudi.hive.metastore.uris", "thrift://127.0.0.1:9087"); + CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "hudi", properties, ""); + ColumnDef idCol = new ColumnDef("id", TypeDef.create(PrimitiveType.INT)); + stmt.addColumnDef(idCol); + ColumnDef nameCol = new ColumnDef("name", TypeDef.create(PrimitiveType.INT), false, + null, true, ColumnDef.DefaultValue.NOT_SET, ""); + stmt.addColumnDef(nameCol); + stmt.analyze(analyzer); + + Assert.assertEquals("CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n" + + " `id` int(11) NOT NULL COMMENT \"\",\n" + + " `name` int(11) NULL COMMENT \"\"\n" + + ") ENGINE = hudi\n" + + "PROPERTIES (\"hudi.database\" = \"doris\",\n" + + "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" + + "\"hudi.table\" = \"test\")", stmt.toString()); + } } diff --git a/fe/pom.xml b/fe/pom.xml index 40406208cfcc75..7c1045346ee197 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -323,6 +323,12 @@ under the License. + + org.apache.hadoop + hadoop-client + 2.8.0 + compile + ${project.groupId} fe-common