Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ under the License.
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client</artifactId>
<version>2.7.4</version>
<scope>compile</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.VecNotImplException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.external.hudi.HudiTable;
import org.apache.doris.external.hudi.HudiUtils;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -617,7 +619,7 @@ public TableRef resolveTableRef(TableRef tableRef) throws AnalysisException {
partition -> partition.getState() == PartitionState.RESTORE
).collect(Collectors.toList()).isEmpty();

if(!isNotRestoring){
if (!isNotRestoring) {
// if doing restore with partitions, the status check push down to OlapScanNode::computePartitionInfo to
// support query that partitions is not restoring.
} else {
Expand All @@ -626,6 +628,11 @@ public TableRef resolveTableRef(TableRef tableRef) throws AnalysisException {
}
}

if (table.getType() == TableType.HUDI && table.getFullSchema().isEmpty()) {
// resolve hudi table's schema when table schema is empty from doris meta
table = HudiUtils.resolveHudiTable((HudiTable) table);
}

// tableName.getTbl() stores the table name specified by the user in the from statement.
// In the case of case-sensitive table names, the value of tableName.getTbl() is the same as table.getName().
// However, since the system view is not case-sensitive, table.getName() gets the lowercase view name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4142,6 +4142,14 @@ private void createHudiTable(Database db, CreateTableStmt stmt) throws DdlExcept
if (!hudiTable.getFullSchema().isEmpty()) {
HudiUtils.validateColumns(hudiTable, hiveTable);
}
switch (hiveTable.getTableType()) {
case "EXTERNAL_TABLE":
case "MANAGED_TABLE":
break;
case "VIRTUAL_VIEW":
default:
throw new DdlException("unsupported hudi table type [" + hiveTable.getTableType() + "].");
}
// check hive table if exists in doris database
if (!db.createTableWithLock(hudiTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public static void dropClient(HiveMetaStoreClient client) {
}

/**
* Get data files of partitions in hive table, filter by partition predicate
* Get data files of partitions in hive table, filter by partition predicate.
* @param hiveTable
* @param hivePartitionPredicate
* @param fileStatuses
Expand All @@ -167,22 +167,13 @@ public static void dropClient(HiveMetaStoreClient client) {
* @throws DdlException
*/
public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate,
List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl) throws DdlException {
HiveMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));

List<TBrokerFileStatus> fileStatuses,
Table remoteHiveTbl) throws DdlException {
List<RemoteIterator<LocatedFileStatus>> remoteIterators;
if (remoteHiveTbl.getPartitionKeys().size() > 0) {
String metaStoreUris = hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS);
// hive partitioned table, get file iterator from table partition sd info
List<Partition> hivePartitions = new ArrayList<>();
try {
client.listPartitionsByExpr(hiveTable.getHiveDb(), hiveTable.getHiveTable(),
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), null, (short) -1, hivePartitions);
} catch (TException e) {
LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
throw new DdlException("Connect hive metastore failed. Error: " + e.getMessage());
} finally {
client.close();
}
List<Partition> hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl, hivePartitionPredicate);
remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties());
} else {
// hive non-partitioned table, get file iterator from table sd info
Expand Down Expand Up @@ -219,6 +210,32 @@ public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDe
return hdfsUrl;
}

/**
* list partitions from hiveMetaStore.
*
* @param metaStoreUris hiveMetaStore uris
* @param remoteHiveTbl Hive table
* @param hivePartitionPredicate filter when list partitions
* @return a list of hive partitions
* @throws DdlException when connect hiveMetaStore failed.
*/
public static List<Partition> getHivePartitions(String metaStoreUris, Table remoteHiveTbl,
ExprNodeGenericFuncDesc hivePartitionPredicate) throws DdlException {
List<Partition> hivePartitions = new ArrayList<>();
HiveMetaStoreClient client = getClient(metaStoreUris);
try {
client.listPartitionsByExpr(remoteHiveTbl.getDbName(), remoteHiveTbl.getTableName(),
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate),
null, (short) -1, hivePartitions);
} catch (TException e) {
LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
throw new DdlException("Connect hive metastore failed.");
} finally {
client.close();
}
return hivePartitions;
}

private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions, Map<String, String> properties) throws DdlException {
List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
Configuration configuration = new Configuration(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.external.hive.util;

import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;

import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;

/**
* Hive util for create or query hive table.
*/
public final class HiveUtil {
private static final Logger LOG = LogManager.getLogger(HiveUtil.class);

private HiveUtil() {
}

/**
* get input format class from inputFormatName.
*
* @param configuration jobConf used when getInputFormatClass
* @param inputFormatName inputFormat class name
* @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
* @return a class of inputFormat.
* @throws UserException when class not found.
*/
public static InputFormat<?, ?> getInputFormat(Configuration configuration,
String inputFormatName, boolean symlinkTarget) throws UserException {
try {
JobConf jobConf = new JobConf(configuration);

Class<? extends InputFormat<?, ?>> inputFormatClass = getInputFormatClass(jobConf, inputFormatName);
if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) {
// symlink targets are always TextInputFormat
inputFormatClass = TextInputFormat.class;
}

return ReflectionUtils.newInstance(inputFormatClass, jobConf);
} catch (ClassNotFoundException | RuntimeException e) {
throw new UserException("Unable to create input format " + inputFormatName, e);
}
}

@SuppressWarnings({"unchecked", "RedundantCast"})
private static Class<? extends InputFormat<?, ?>> getInputFormatClass(JobConf conf, String inputFormatName)
throws ClassNotFoundException {
// CDH uses different names for Parquet
if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName)
|| "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) {
return MapredParquetInputFormat.class;
}

Class<?> clazz = conf.getClassByName(inputFormatName);
return (Class<? extends InputFormat<?, ?>>) clazz.asSubclass(InputFormat.class);
}

/**
* transform hiveSchema to Doris schema.
*
* @param hiveSchema hive schema
* @return doris schema
* @throws AnalysisException when transform failed.
*/
public static List<Column> transformHiveSchema(List<FieldSchema> hiveSchema) throws AnalysisException {
List<Column> newSchema = Lists.newArrayList();
for (FieldSchema hiveColumn : hiveSchema) {
try {
newSchema.add(HiveUtil.transformHiveField(hiveColumn));
} catch (UnsupportedOperationException e) {
LOG.warn("Unsupported data type in Doris, ignore column[{}], with error: {}",
hiveColumn.getName(), e.getMessage());
throw e;
}
}
return newSchema;
}

/**
* tranform hive field to doris column.
*
* @param field hive field to be transformed
* @return doris column
*/
public static Column transformHiveField(FieldSchema field) {
TypeInfo hiveTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
Type type = convertHiveTypeToiveDoris(hiveTypeInfo);
return new Column(field.getName(), type, false, null, true, null, field.getComment());
}

private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) {

switch (hiveTypeInfo.getCategory()) {
case PRIMITIVE: {
PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) hiveTypeInfo;
switch (primitiveTypeInfo.getPrimitiveCategory()) {
case VOID:
return Type.NULL;
case BOOLEAN:
return Type.BOOLEAN;
case BYTE:
return Type.TINYINT;
case SHORT:
return Type.SMALLINT;
case INT:
return Type.INT;
case LONG:
return Type.BIGINT;
case FLOAT:
return Type.FLOAT;
case DOUBLE:
return Type.DOUBLE;
case STRING:
return Type.STRING;
case CHAR:
return Type.CHAR;
case VARCHAR:
return Type.VARCHAR;
case DATE:
return Type.DATE;
case TIMESTAMP:
return Type.DATETIME;
case DECIMAL:
return Type.DECIMALV2;
default:
throw new UnsupportedOperationException("Unsupported type: "
+ primitiveTypeInfo.getPrimitiveCategory());
}
}
case LIST:
TypeInfo elementTypeInfo = ((ListTypeInfo) hiveTypeInfo)
.getListElementTypeInfo();
Type newType = null;
if (elementTypeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE) {
newType = convertHiveTypeToiveDoris(elementTypeInfo);
} else {
throw new UnsupportedOperationException("Unsupported type: " + hiveTypeInfo.toString());
}
return new ArrayType(newType);
case MAP:
case STRUCT:
case UNION:
default:
throw new UnsupportedOperationException("Unsupported type: " + hiveTypeInfo.toString());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public TTableDescriptor toThrift() {
thriftHudiTable.setTableName(getHmsTableName());
thriftHudiTable.setProperties(getTableProperties());

TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.HUDI_TABLE,
TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE,
fullSchema.size(), 0, getName(), "");
thriftTableDescriptor.setHudiTable(thriftHudiTable);
return thriftTableDescriptor;
Expand Down
Loading