From f129b73c2cd5fc2d09bdb00afdf70797e70b4d23 Mon Sep 17 00:00:00 2001 From: Socrates Date: Fri, 7 Nov 2025 15:29:57 +0800 Subject: [PATCH] fix --- be/src/vec/exec/scan/vfile_scanner.cpp | 6 ++ .../iceberg/source/IcebergScanNode.java | 78 +++++++------------ 2 files changed, 36 insertions(+), 48 deletions(-) diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 6d7dac8d57302a..ae005f61ddcc02 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -1013,6 +1013,12 @@ Status VFileScanner::_generate_fill_columns() { if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { for (const auto& slot_desc : _partition_slot_descs) { if (slot_desc) { + // Only fill partition column from path if it's missing from file + // If partition column exists in file, use the value from file instead + if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) { + // Partition column exists in file, skip filling from path + continue; + } auto it = _partition_slot_index_map.find(slot_desc->id()); if (it == std::end(_partition_slot_index_map)) { return Status::InternalError("Unknown source slot descriptor, slot_id={}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 3a20a64599e18f..c0723e46f85903 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -57,6 +57,7 @@ import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -64,6 +65,9 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.logging.log4j.LogManager; @@ -77,6 +81,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.stream.Collectors; public class IcebergScanNode extends FileQueryScanNode { @@ -226,44 +231,32 @@ private List doGetSplits(int numBackends) throws UserException { boolean isPartitionedTable = icebergTable.spec().isPartitioned(); long realFileSplitSize = getRealFileSplitSize(0); - CloseableIterable fileScanTasks = null; - try { - fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize); - } catch (NullPointerException e) { - /* - Caused by: java.lang.NullPointerException: Type cannot be null - at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull - (Preconditions.java:921) ~[iceberg-bundled-guava-1.4.3.jar:?] - at org.apache.iceberg.types.Types$NestedField.(Types.java:447) ~[iceberg-api-1.4.3.jar:?] - at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) ~[iceberg-api-1.4.3.jar:?] - at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) ~[iceberg-api-1.4.3.jar:?] - at org.apache.iceberg.DeleteFileIndex.lambda$new$0(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach - (RegularImmutableMap.java:297) ~[iceberg-bundled-guava-1.4.3.jar:?] - at org.apache.iceberg.DeleteFileIndex.(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.DeleteFileIndex.(DeleteFileIndex.java:71) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.DeleteFileIndex$Builder.build(DeleteFileIndex.java:578) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.ManifestGroup.plan(ManifestGroup.java:183) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.ManifestGroup.planFiles(ManifestGroup.java:170) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.DataTableScan.doPlanFiles(DataTableScan.java:89) ~[iceberg-core-1.4.3.jar:?] - at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) ~[iceberg-core-1.4.3.jar:?] - at org.apache.doris.datasource.iceberg.source.IcebergScanNode.doGetSplits - (IcebergScanNode.java:209) ~[doris-fe.jar:1.2-SNAPSHOT] - EXAMPLE: - CREATE TABLE iceberg_tb(col1 INT,col2 STRING) USING ICEBERG PARTITIONED BY (bucket(10,col2)); - INSERT INTO iceberg_tb VALUES( ... ); - ALTER TABLE iceberg_tb DROP PARTITION FIELD bucket(10,col2); - ALTER TABLE iceberg_tb DROP COLUMNS col2 STRING; - Link: https://github.com/apache/iceberg/pull/10755 - */ - LOG.warn("Iceberg TableScanUtil.splitFiles throw NullPointerException. Cause : ", e); - throw new NotSupportedException("Unable to read Iceberg table with dropped old partition column."); - } + CloseableIterable fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize); try (CloseableIterable combinedScanTasks = TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1, 0)) { combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { + List partitionValues = new ArrayList<>(); if (isPartitionedTable) { StructLike structLike = splitTask.file().partition(); + List fields = splitTask.spec().fields(); + Types.StructType structType = icebergTable.schema().asStruct(); + + // set partitionValue for this IcebergSplit + for (int i = 0; i < structLike.size(); i++) { + Object obj = structLike.get(i, Object.class); + String value = String.valueOf(obj); + PartitionField partitionField = fields.get(i); + if (partitionField.transform().isIdentity()) { + Type type = structType.fieldType(partitionField.name()); + if (type != null && type.typeId().equals(Type.TypeID.DATE)) { + // iceberg use integer to store date, + // we need transform it to string + value = DateTimeUtil.daysToIsoDate((Integer) obj); + } + } + partitionValues.add(value); + } + // Counts the number of partitions read partitionPathSet.add(structLike.toString()); } @@ -277,7 +270,7 @@ private List doGetSplits(int numBackends) throws UserException { new String[0], formatVersion, source.getCatalog().getProperties(), - new ArrayList<>(), + partitionValues, originalPath); split.setTargetSplitSize(realFileSplitSize); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { @@ -310,6 +303,7 @@ private List doGetSplits(int numBackends) throws UserException { return pushDownCountSplits; } } + selectedPartitionNum = partitionPathSet.size(); return splits; } @@ -371,20 +365,8 @@ public TFileFormatType getFileFormatType() throws UserException { @Override public List getPathPartitionKeys() throws UserException { - // return icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase) - // .collect(Collectors.toList()); - /**First, iceberg partition columns are based on existing fields, which will be stored in the actual data file. - * Second, iceberg partition columns support Partition transforms. In this case, the path partition key is not - * equal to the column name of the partition column, so remove this code and get all the columns you want to - * read from the file. - * Related code: - * be/src/vec/exec/scan/vfile_scanner.cpp: - * VFileScanner::_init_expr_ctxes() - * if (slot_info.is_file_slot) { - * xxxx - * } - */ - return new ArrayList<>(); + return icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase) + .collect(Collectors.toList()); } @Override