From ded1f8f0e764dce1b54bb0f110132d48209bd878 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 7 Feb 2024 11:16:44 +0800 Subject: [PATCH 1/5] opt --- .../source/HudiCachedPartitionProcessor.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java index eb09c5efb59f56..0f2ff36e47953c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java @@ -19,9 +19,9 @@ import org.apache.doris.common.Config; import org.apache.doris.datasource.CacheException; -import org.apache.doris.datasource.TablePartitionValues; -import org.apache.doris.datasource.TablePartitionValues.TablePartitionKey; -import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.planner.external.TablePartitionValues; +import org.apache.doris.planner.external.TablePartitionValues.TablePartitionKey; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; @@ -31,6 +31,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; @@ -39,6 +41,7 @@ import java.util.stream.Collectors; public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { + private static final Logger LOG = LoggerFactory.getLogger(HudiCachedPartitionProcessor.class); private final long catalogId; private final Executor executor; private final LoadingCache partitionCache; @@ -137,7 +140,13 @@ public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTab if (lastTimestamp <= lastUpdateTimestamp) { return partitionValues; } - List partitionNames = getAllPartitionNames(tableMetaClient); + HMSExternalCatalog catalog = (HMSExternalCatalog) table.getCatalog(); + List partitionNames; + partitionNames = catalog.getClient().listPartitionNames(table.getDbName(), table.getName()); + if (partitionNames.size() == 0) { + LOG.warn("Failed to get partitions from hms api, switch it from hudi api."); + partitionNames = getAllPartitionNames(tableMetaClient); + } List partitionColumnsList = Arrays.asList(partitionColumns.get()); partitionValues.cleanPartitions(); partitionValues.addPartitions(partitionNames, From fafe6c1b284074f7fc0b02b1a084ded27c173cf2 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 7 Feb 2024 18:15:28 +0800 Subject: [PATCH 2/5] add comment --- .../datasource/hudi/source/HudiCachedPartitionProcessor.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java index 0f2ff36e47953c..5996dd0c661ebe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java @@ -142,6 +142,10 @@ public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTab } HMSExternalCatalog catalog = (HMSExternalCatalog) table.getCatalog(); List partitionNames; + // When a Hudi table is synchronized to HMS, the partition information is also synchronized, + // so even if the metastore is not enabled in the Hudi table + // (for example, if the Metastore is false for a Hudi table created with Flink), + // we can still obtain the partition information through the HMS API. partitionNames = catalog.getClient().listPartitionNames(table.getDbName(), table.getName()); if (partitionNames.size() == 0) { LOG.warn("Failed to get partitions from hms api, switch it from hudi api."); From 8297bf6f076beef37eacac46dfc9ba6eab238120 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Mon, 19 Feb 2024 11:38:10 +0800 Subject: [PATCH 3/5] add useHiveSyncPartition --- .../datasource/hive/HMSExternalTable.java | 11 ++++++++ .../source/HudiCachedPartitionProcessor.java | 25 +++++++++++-------- .../datasource/hudi/source/HudiScanNode.java | 8 ++++-- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index c841c749fb20ad..62b2c35b8c5c9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -104,6 +104,9 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI private static final String SPARK_STATS_MAX_LEN = ".avgLen"; private static final String SPARK_STATS_HISTOGRAM = ".histogram"; + private static final String USE_HIVE_SYNC_PARTITION = "use_hive_sync_partition"; + + static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); @@ -227,6 +230,14 @@ public boolean isHoodieCowTable() { return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName); } + /** + * Some data lakes (such as Hudi) will synchronize their partition information to HMS, + * then we can quickly obtain the partition information of the table from HMS. + */ + public boolean useHiveSyncPartition() { + return Boolean.parseBoolean(catalog.getProperties().getOrDefault(USE_HIVE_SYNC_PARTITION, "false")); + } + /** * Now we only support three file input format hive tables: parquet/orc/text. * Support managed_table and external_table. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java index 5996dd0c661ebe..8b7031082d615a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java @@ -84,7 +84,7 @@ public void cleanTablePartitions(String dbName, String tblName) { } public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table, - HoodieTableMetaClient tableMetaClient, String timestamp) { + HoodieTableMetaClient tableMetaClient, String timestamp, boolean useHiveSyncPartition) { Preconditions.checkState(catalogId == table.getCatalog().getId()); Option partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); if (!partitionColumns.isPresent()) { @@ -97,7 +97,7 @@ public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table, } long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp()); if (Long.parseLong(timestamp) == lastTimestamp) { - return getPartitionValues(table, tableMetaClient); + return getPartitionValues(table, tableMetaClient, useHiveSyncPartition); } List partitionNameAndValues = getPartitionNamesBeforeOrEquals(timeline, timestamp); List partitionNames = Arrays.asList(partitionColumns.get()); @@ -108,7 +108,8 @@ public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table, return partitionValues; } - public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient) + public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient, + boolean useHiveSyncPartition) throws CacheException { Preconditions.checkState(catalogId == table.getCatalog().getId()); Option partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); @@ -142,13 +143,17 @@ public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTab } HMSExternalCatalog catalog = (HMSExternalCatalog) table.getCatalog(); List partitionNames; - // When a Hudi table is synchronized to HMS, the partition information is also synchronized, - // so even if the metastore is not enabled in the Hudi table - // (for example, if the Metastore is false for a Hudi table created with Flink), - // we can still obtain the partition information through the HMS API. - partitionNames = catalog.getClient().listPartitionNames(table.getDbName(), table.getName()); - if (partitionNames.size() == 0) { - LOG.warn("Failed to get partitions from hms api, switch it from hudi api."); + if (useHiveSyncPartition) { + // When a Hudi table is synchronized to HMS, the partition information is also synchronized, + // so even if the metastore is not enabled in the Hudi table + // (for example, if the Metastore is false for a Hudi table created with Flink), + // we can still obtain the partition information through the HMS API. + partitionNames = catalog.getClient().listPartitionNames(table.getDbName(), table.getName()); + if (partitionNames.size() == 0) { + LOG.warn("Failed to get partitions from hms api, switch it from hudi api."); + partitionNames = getAllPartitionNames(tableMetaClient); + } + } else { partitionNames = getAllPartitionNames(tableMetaClient); } List partitionColumnsList = Arrays.asList(partitionColumns.get()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 144d308140e60e..4f4b1c3e8ff9c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -83,6 +83,8 @@ public class HudiScanNode extends HiveScanNode { private final AtomicLong noLogsSplitNum = new AtomicLong(0); + private final boolean useHiveSyncPartition; + /** * External file scan node for Query Hudi table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -102,6 +104,7 @@ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumn LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName()); } } + useHiveSyncPartition = hmsTable.useHiveSyncPartition(); } @Override @@ -171,9 +174,10 @@ private List getPrunedPartitions( .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog()); TablePartitionValues partitionValues; if (snapshotTimestamp.isPresent()) { - partitionValues = processor.getSnapshotPartitionValues(hmsTable, metaClient, snapshotTimestamp.get()); + partitionValues = processor.getSnapshotPartitionValues( + hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition); } else { - partitionValues = processor.getPartitionValues(hmsTable, metaClient); + partitionValues = processor.getPartitionValues(hmsTable, metaClient, useHiveSyncPartition); } if (partitionValues != null) { // 2. prune partitions by expr From a714c38b8edc9b27afe4736c7729a046f4d1b3e6 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Mon, 19 Feb 2024 15:48:58 +0800 Subject: [PATCH 4/5] doc --- docs/en/docs/lakehouse/multi-catalog/hudi.md | 6 ++++++ docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/docs/en/docs/lakehouse/multi-catalog/hudi.md b/docs/en/docs/lakehouse/multi-catalog/hudi.md index 52892db2df2174..a52c2370ced161 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/en/docs/lakehouse/multi-catalog/hudi.md @@ -55,6 +55,12 @@ CREATE CATALOG hudi PROPERTIES ( ); ``` +Optional configuration parameters: + +|name|description|default| +|---|---|---| +|use_hive_sync_partition|Use hms synchronized partition data|false| + ## Column Type Mapping Same as that in Hive Catalogs. See the relevant section in [Hive](./hive.md). diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md index b619283cacf5bc..38bb26d3bc79ed 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md @@ -55,6 +55,12 @@ CREATE CATALOG hudi PROPERTIES ( ); ``` +可选配置参数: + +|参数名|说明|默认值| +|---|---|---| +|use_hive_sync_partition|使用hms已同步的分区数据|false| + ## 列类型映射 和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。 From c396702f6def4385ff68fd91b9ed38588162660c Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Tue, 20 Feb 2024 18:45:12 +0800 Subject: [PATCH 5/5] merge --- .../hudi/source/HudiCachedPartitionProcessor.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java index 8b7031082d615a..90c89dbbda2c76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java @@ -19,9 +19,10 @@ import org.apache.doris.common.Config; import org.apache.doris.datasource.CacheException; -import org.apache.doris.datasource.HMSExternalCatalog; -import org.apache.doris.planner.external.TablePartitionValues; -import org.apache.doris.planner.external.TablePartitionValues.TablePartitionKey; +import org.apache.doris.datasource.TablePartitionValues; +import org.apache.doris.datasource.TablePartitionValues.TablePartitionKey; +import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalTable; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder;