Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion be/src/vec/exec/format/table/hudi_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Block;

namespace doris::vectorized {

const std::string HudiJniReader::HOODIE_CONF_PREFIX = "hoodie.";
const std::string HudiJniReader::HADOOP_CONF_PREFIX = "hadoop_conf.";

HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params,
Expand Down Expand Up @@ -67,7 +68,11 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params,

// Use compatible hadoop client to read data
for (auto& kv : _scan_params.properties) {
params[HADOOP_CONF_PREFIX + kv.first] = kv.second;
if (kv.first.starts_with(HOODIE_CONF_PREFIX)) {
params[kv.first] = kv.second;
} else {
params[HADOOP_CONF_PREFIX + kv.first] = kv.second;
}
}

_jni_connector = std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner", params,
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/table/hudi_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class HudiJniReader : public GenericReader {
ENABLE_FACTORY_CREATOR(HudiJniReader);

public:
static const std::string HOODIE_CONF_PREFIX;
static const std::string HADOOP_CONF_PREFIX;

HudiJniReader(const TFileScanRangeParams& scan_params, const THudiFileDesc& hudi_params,
Expand Down
3 changes: 3 additions & 0 deletions docs/en/docs/lakehouse/multi-catalog/hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ CREATE CATALOG hudi PROPERTIES (

Same as that in Hive Catalogs. See the relevant section in [Hive](./hive.md).

## Skip Merge
Spark will create the read optimize table with `_ro` suffix when generating hudi mor table. Doris will skip the log files when reading optimize table. Doris does not determine whether a table is read optimize by the `_ro` suffix instead of the hive inputformat. Users can observe whether the inputformat of the 'cow/mor/read optimize' table is the same through the `SHOW CREATE TABLE` command. In addition, Doris supports adding hoodie related configurations to catalog properties, which are compatible with [Spark Datasource Configs](https://hudi.apache.org/docs/configurations/#Read-Options), so users can add `hoodie.datasource.merge.type=skip_merge` in catalog properties to skip merge logs files.

## Query Optimization
Doris uses the parquet native reader to read the data files of the COW table, and uses the Java SDK (By calling hudi-bundle through JNI) to read the data files of the MOR table. In `upsert` scenario, there may still remains base files that have not been updated in the MOR table, which can be read through the parquet native reader. Users can view the execution plan of hudi scan through the [explain](../../advanced/best-practice/query-analysis.md) command, where `hudiNativeReadSplits` indicates how many split files are read through the parquet native reader.
```
Expand Down
4 changes: 4 additions & 0 deletions docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ CREATE CATALOG hudi PROPERTIES (

和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。

## Skip Merge
Spark 在创建 hudi mor 表的时候,会创建 `_ro` 后缀的 read optimize 表,doris 读取 read optimize 表会跳过 log 文件的合并。doris 判定一个表是否为 read optimize 表并不是通过 `_ro` 后缀,而是通过 hive inputformat,用户可以通过 `SHOW CREATE TABLE` 命令观察 cow/mor/read optimize 表的 inputformat 是否相同。
此外 doris 支持在 catalog properties 添加 hoodie 相关的配置,配置项兼容 [Spark Datasource Configs](https://hudi.apache.org/docs/configurations/#Read-Options)。所以用户可以在 catalog properties 中添加 `hoodie.datasource.merge.type=skip_merge` 跳过合并 log 文件。

## 查询优化

Doris 使用 parquet native reader 读取 COW 表的数据文件,使用 Java SDK(通过JNI调用hudi-bundle) 读取 MOR 表的数据文件。在 upsert 场景下,MOR 依然会有数据文件没有被更新,这部分文件可以通过 parquet native reader读取,用户可以通过 [explain](../../advanced/best-practice/query-analysis.md) 命令查看 hudi scan 的执行计划,`hudiNativeReadSplits` 表示有多少 split 文件通过 parquet native reader 读取。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public class HudiJniScanner extends JniScanner {

static {
int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2 + 1, 4);
if (numThreads > 32) {
numThreads = Runtime.getRuntime().availableProcessors();
}
avroReadPool = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat("avro-log-reader-%d").build());
LOG.info("Create " + numThreads + " daemon threads to load avro logs");
Expand Down Expand Up @@ -176,10 +179,15 @@ public void open() throws IOException {
if (ugi != null) {
recordIterator = ugi.doAs(
(PrivilegedExceptionAction<Iterator<InternalRow>>) () -> new MORSnapshotSplitReader(
split).buildScanIterator(split.requiredFields(), new Filter[0]));
split).buildScanIterator(new Filter[0]));
} else {
recordIterator = new MORSnapshotSplitReader(split)
.buildScanIterator(split.requiredFields(), new Filter[0]);
.buildScanIterator(new Filter[0]);
}
if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) {
cachedResolvers.computeIfAbsent(Thread.currentThread().getId(),
threadId -> AVRO_RESOLVER_CACHE.get());
AVRO_RESOLVER_CACHE.get().clear();
}
} catch (Exception e) {
LOG.error("Failed to open hudi scanner, split params:\n" + debugString, e);
Expand All @@ -189,10 +197,6 @@ public void open() throws IOException {
}
isKilled.set(true);
executorService.shutdownNow();
if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) {
cachedResolvers.computeIfAbsent(Thread.currentThread().getId(),
threadId -> AVRO_RESOLVER_CACHE.get());
}
getRecordReaderTimeNs += System.nanoTime() - startTime;
});
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ case class HoodieTableInformation(sparkSession: SparkSession,
metaClient: HoodieTableMetaClient,
timeline: HoodieTimeline,
tableConfig: HoodieTableConfig,
resolvedTargetFields: Array[String],
tableAvroSchema: Schema,
internalSchemaOpt: Option[InternalSchema])

/**
Expand Down Expand Up @@ -214,22 +216,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
* required to fetch table's Avro and Internal schemas
*/
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
val schemaResolver = new TableSchemaResolver(tableInformation.metaClient)
val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
val avroSchema: Schema = tableInformation.internalSchemaOpt.map { is =>
AvroInternalSchemaConverter.convert(is, namespace + "." + name)
} orElse {
specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
} orElse {
split.schemaSpec.map(s => convertToAvroSchema(s, tableName))
} getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
throw new HoodieSchemaException("Failed to fetch schema from the table", e)
}
}
(avroSchema, tableInformation.internalSchemaOpt)
(tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt)
}

protected lazy val tableStructSchema: StructType = convertAvroSchemaToStructType(tableAvroSchema)
Expand Down Expand Up @@ -280,13 +267,13 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false")
}

def buildScanIterator(requiredColumns: Array[String], filters: Array[Filter]): Iterator[InternalRow] = {
def buildScanIterator(filters: Array[Filter]): Iterator[InternalRow] = {
// NOTE: PLEASE READ CAREFULLY BEFORE MAKING CHANGES
// *Appending* additional columns to the ones requested by the caller is not a problem, as those
// will be eliminated by the caller's projection;
// (!) Please note, however, that it's critical to avoid _reordering_ of the requested columns as this
// will break the upstream projection
val targetColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val targetColumns: Array[String] = appendMandatoryColumns(tableInformation.resolvedTargetFields)
// NOTE: We explicitly fallback to default table's Avro schema to make sure we avoid unnecessary Catalyst > Avro
// schema conversion, which is lossy in nature (for ex, it doesn't preserve original Avro type-names) and
// could have an effect on subsequent de-/serializing records in some exotic scenarios (when Avro unions
Expand Down Expand Up @@ -663,11 +650,36 @@ object BaseSplitReader {
None
}
}
val tableName = metaClient.getTableConfig.getTableName
val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
val avroSchema: Schema = internalSchemaOpt.map { is =>
AvroInternalSchemaConverter.convert(is, namespace + "." + name)
} orElse {
specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
} orElse {
split.schemaSpec.map(s => convertToAvroSchema(s, tableName))
} getOrElse {
Try(schemaResolver.getTableAvroSchema) match {
case Success(schema) => schema
case Failure(e) =>
throw new HoodieSchemaException("Failed to fetch schema from the table", e)
}
}

// match column name in lower case
val colNames = internalSchemaOpt.map { internalSchema =>
internalSchema.getAllColsFullName.asScala.map(f => f.toLowerCase -> f).toMap
} getOrElse {
avroSchema.getFields.asScala.map(f => f.name().toLowerCase -> f.name()).toMap
}
val resolvedTargetFields = split.requiredFields.map(field => colNames.getOrElse(field.toLowerCase, field))

HoodieTableInformation(sparkSession,
metaClient,
timeline,
metaClient.getTableConfig,
resolvedTargetFields,
avroSchema,
internalSchemaOpt)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ class HoodieMORRecordIterator(config: Configuration,

case split => mergeType match {
case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
val reader = fileReaders.requiredSchemaReaderSkipMerging
new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config)
// val reader = fileReaders.requiredSchemaReaderSkipMerging
// new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config)
throw new UnsupportedOperationException("Skip merge is optimized by native read")

case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
val reader = pickBaseFileReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class HudiScanNode extends HiveScanNode {

private static final Logger LOG = LogManager.getLogger(HudiScanNode.class);

private final boolean isCowTable;
private final boolean isCowOrRoTable;

private final AtomicLong noLogsSplitNum = new AtomicLong(0);

Expand All @@ -91,17 +91,18 @@ public class HudiScanNode extends HiveScanNode {
*/
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv);
isCowTable = hmsTable.isHoodieCowTable();
if (isCowTable) {
LOG.debug("Hudi table {} can read as cow table", hmsTable.getName());
isCowOrRoTable = hmsTable.isHoodieCowTable() || "skip_merge".equals(
hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type"));
if (isCowOrRoTable) {
LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getName());
} else {
LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName());
}
}

@Override
public TFileFormatType getFileFormatType() throws UserException {
if (isCowTable) {
if (isCowOrRoTable) {
return super.getFileFormatType();
} else {
// Use jni to read hudi table in BE
Expand All @@ -124,7 +125,7 @@ protected void doInitialize() throws UserException {

@Override
protected Map<String, String> getLocationProperties() throws UserException {
if (isCowTable) {
if (isCowOrRoTable) {
return super.getLocationProperties();
} else {
// HudiJniScanner uses hadoop client to read data.
Expand Down Expand Up @@ -291,7 +292,7 @@ public List<Split> getSplits() throws UserException {
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
timeline, statuses.toArray(new FileStatus[0]));

if (isCowTable) {
if (isCowOrRoTable) {
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
Expand All @@ -312,7 +313,9 @@ public List<Split> getSplits() throws UserException {
noLogsSplitNum.incrementAndGet();
}

HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize,
// no base file, use log file to parse file type
String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath;
HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize,
new String[0], partition.getPartitionValues());
split.setTableFormatType(TableFormatType.HUDI);
split.setDataFilePath(filePath);
Expand Down
25 changes: 25 additions & 0 deletions regression-test/data/external_table_p2/hive/test_hive_hudi.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !optimize_table --
20230605145009209 20230605145009209_0_0 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_1 2021-01-01 0 bob v_0 toBeDel0 0 1000000
20230605145403388 20230605145403388_2_0 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0_2-83-222_20230605145403388.parquet row_1 2011-11-11 1 bob v_1 toBeDel1 0 1000001
20230605145009209 20230605145009209_0_1 rowId:row_2 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_2 2021-01-01 0 john v_0 toBeDel0 0 1000000
20230605145403388 20230605145403388_1_0 rowId:row_4 partitionId=2021-02-01/versionId=v_4 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet row_4 2021-02-01 4 ashin v_4 toBeDel4 0 1000004

-- !merge_on_read --
20230801201335031 20230801201335031_0_1 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0 row_1 2021-01-01 0 bob v_0 toBeDel0 1 1000000
20230801201335031 20230801201335031_1_1 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0 row_1 2011-11-11 1 bob v_1 toBeDel1 1 1000001
20230605145009209 20230605145009209_0_1 rowId:row_2 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_2 2021-01-01 0 john v_0 toBeDel0 0 1000000
20230605145403388 20230605145403388_1_0 rowId:row_4 partitionId=2021-02-01/versionId=v_4 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet row_4 2021-02-01 4 ashin v_4 toBeDel4 0 1000004

-- !lowercase_column --
row_1 2021-01-01 0 v_0
row_1 2011-11-11 1 v_1
row_2 2021-01-01 0 v_0
row_4 2021-02-01 4 v_4

-- !skip_merge --
20230605145009209 20230605145009209_0_0 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_1 2021-01-01 0 bob v_0 toBeDel0 0 1000000
20230605145403388 20230605145403388_2_0 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0_2-83-222_20230605145403388.parquet row_1 2011-11-11 1 bob v_1 toBeDel1 0 1000001
20230605145009209 20230605145009209_0_1 rowId:row_2 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_2 2021-01-01 0 john v_0 toBeDel0 0 1000000
20230605145403388 20230605145403388_1_0 rowId:row_4 partitionId=2021-02-01/versionId=v_4 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet row_4 2021-02-01 4 ashin v_4 toBeDel4 0 1000004

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hive_hudi", "p2,external,hive,hudi") {
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String catalog_name = "test_hive_hudi"

sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'hadoop.username'='hadoop',
'type'='hms',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""

sql """use ${catalog_name}.hudi_catalog"""
// read optimize table with partition
qt_optimize_table """select * from partitioned_mor_ro order by rowid, versionid"""
// copy on write table with update
qt_merge_on_read """select * from partitioned_mor_rt order by rowid, versionid"""
// match colum name in lower case
qt_lowercase_column """select RoWiD, PaRtiTionID, PrEComB, VerSIonID from partitioned_mor_rt order by rowid, versionid"""


// skip logs
sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'hadoop.username'='hadoop',
'type'='hms',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}',
'hoodie.datasource.merge.type'='skip_merge'
);
"""
// copy on write table with update, skip merge logs, so the result is the same as partitioned_mor_ro
qt_skip_merge """select * from partitioned_mor_rt order by rowid, versionid"""

sql """drop catalog if exists ${catalog_name};"""
}
}