Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/en/docs/lakehouse/multi-catalog/hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ CREATE CATALOG hudi PROPERTIES (
);
```

Optional configuration parameters:

|name|description|default|
|---|---|---|
|use_hive_sync_partition|Use hms synchronized partition data|false|

## Column Type Mapping

Same as that in Hive Catalogs. See the relevant section in [Hive](./hive.md).
Expand Down
6 changes: 6 additions & 0 deletions docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ CREATE CATALOG hudi PROPERTIES (
);
```

可选配置参数:

|参数名|说明|默认值|
|---|---|---|
|use_hive_sync_partition|使用hms已同步的分区数据|false|

## 列类型映射

和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class HMSExternalTable extends ExternalTable {

private static final String NUM_ROWS = "numRows";

private static final String USE_HIVE_SYNC_PARTITION = "use_hive_sync_partition";

static {
SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
Expand Down Expand Up @@ -194,6 +196,14 @@ private boolean supportedHoodieTable() {
return inputFormatName != null && SUPPORTED_HUDI_FILE_FORMATS.contains(inputFormatName);
}

/**
* Some data lakes (such as Hudi) will synchronize their partition information to HMS,
* then we can quickly obtain the partition information of the table from HMS.
*/
public boolean useHiveSyncPartition() {
return Boolean.parseBoolean(catalog.getProperties().getOrDefault(USE_HIVE_SYNC_PARTITION, "false"));
}

public boolean isHoodieCowTable() {
if (remoteTable.getSd() == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void cleanTablePartitions(String dbName, String tblName) {
}

public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table,
HoodieTableMetaClient tableMetaClient, String timestamp) {
HoodieTableMetaClient tableMetaClient, String timestamp, boolean useHiveSyncPartition) {
Preconditions.checkState(catalogId == table.getCatalog().getId());
Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields();
if (!partitionColumns.isPresent()) {
Expand All @@ -98,7 +98,7 @@ public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table,
}
long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp());
if (Long.parseLong(timestamp) == lastTimestamp) {
return getPartitionValues(table, tableMetaClient);
return getPartitionValues(table, tableMetaClient, useHiveSyncPartition);
}
List<String> partitionNameAndValues = getPartitionNamesBeforeOrEquals(timeline, timestamp);
List<String> partitionNames = Arrays.asList(partitionColumns.get());
Expand All @@ -109,7 +109,8 @@ public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable table,
return partitionValues;
}

public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient)
public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient,
boolean useHiveSyncPartition)
throws CacheException {
Preconditions.checkState(catalogId == table.getCatalog().getId());
Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields();
Expand Down Expand Up @@ -143,13 +144,17 @@ public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTab
}
HMSExternalCatalog catalog = (HMSExternalCatalog) table.getCatalog();
List<String> partitionNames;
// When a Hudi table is synchronized to HMS, the partition information is also synchronized,
// so even if the metastore is not enabled in the Hudi table
// (for example, if the Metastore is false for a Hudi table created with Flink),
// we can still obtain the partition information through the HMS API.
partitionNames = catalog.getClient().listPartitionNames(table.getDbName(), table.getName());
if (partitionNames.size() == 0) {
LOG.warn("Failed to get partitions from hms api, switch it from hudi api.");
if (useHiveSyncPartition) {
// When a Hudi table is synchronized to HMS, the partition information is also synchronized,
// so even if the metastore is not enabled in the Hudi table
// (for example, if the Metastore is false for a Hudi table created with Flink),
// we can still obtain the partition information through the HMS API.
partitionNames = catalog.getClient().listPartitionNames(table.getDbName(), table.getName());
if (partitionNames.size() == 0) {
LOG.warn("Failed to get partitions from hms api, switch it from hudi api.");
partitionNames = getAllPartitionNames(tableMetaClient);
}
} else {
partitionNames = getAllPartitionNames(tableMetaClient);
}
List<String> partitionColumnsList = Arrays.asList(partitionColumns.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class HudiScanNode extends HiveScanNode {

private final AtomicLong noLogsSplitNum = new AtomicLong(0);

private final boolean useHiveSyncPartition;

/**
* External file scan node for Query Hudi table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
Expand All @@ -97,6 +99,7 @@ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumn
} else {
LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName());
}
useHiveSyncPartition = hmsTable.useHiveSyncPartition();
}

@Override
Expand Down Expand Up @@ -166,9 +169,10 @@ private List<HivePartition> getPrunedPartitions(
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
TablePartitionValues partitionValues;
if (snapshotTimestamp.isPresent()) {
partitionValues = processor.getSnapshotPartitionValues(hmsTable, metaClient, snapshotTimestamp.get());
partitionValues = processor.getSnapshotPartitionValues(
hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition);
} else {
partitionValues = processor.getPartitionValues(hmsTable, metaClient);
partitionValues = processor.getPartitionValues(hmsTable, metaClient, useHiveSyncPartition);
}
if (partitionValues != null) {
// 2. prune partitions by expr
Expand Down