Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,12 @@ Status VFileScanner::_generate_fill_columns() {
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
for (const auto& slot_desc : _partition_slot_descs) {
if (slot_desc) {
// Only fill partition column from path if it's missing from file
// If partition column exists in file, use the value from file instead
if (_missing_cols.find(slot_desc->col_name()) == _missing_cols.end()) {
// Partition column exists in file, skip filling from path
continue;
}
auto it = _partition_slot_index_map.find(slot_desc->id());
if (it == std::end(_partition_slot_index_map)) {
return Status::InternalError("Unknown source slot descriptor, slot_id={}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,17 @@
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.logging.log4j.LogManager;
Expand All @@ -77,6 +81,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;

public class IcebergScanNode extends FileQueryScanNode {

Expand Down Expand Up @@ -226,44 +231,32 @@ private List<Split> doGetSplits(int numBackends) throws UserException {
boolean isPartitionedTable = icebergTable.spec().isPartitioned();

long realFileSplitSize = getRealFileSplitSize(0);
CloseableIterable<FileScanTask> fileScanTasks = null;
try {
fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize);
} catch (NullPointerException e) {
/*
Caused by: java.lang.NullPointerException: Type cannot be null
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull
(Preconditions.java:921) ~[iceberg-bundled-guava-1.4.3.jar:?]
at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) ~[iceberg-api-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.lambda$new$0(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap.forEach
(RegularImmutableMap.java:297) ~[iceberg-bundled-guava-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:97) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex.<init>(DeleteFileIndex.java:71) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DeleteFileIndex$Builder.build(DeleteFileIndex.java:578) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.ManifestGroup.plan(ManifestGroup.java:183) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.ManifestGroup.planFiles(ManifestGroup.java:170) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.DataTableScan.doPlanFiles(DataTableScan.java:89) ~[iceberg-core-1.4.3.jar:?]
at org.apache.iceberg.SnapshotScan.planFiles(SnapshotScan.java:139) ~[iceberg-core-1.4.3.jar:?]
at org.apache.doris.datasource.iceberg.source.IcebergScanNode.doGetSplits
(IcebergScanNode.java:209) ~[doris-fe.jar:1.2-SNAPSHOT]
EXAMPLE:
CREATE TABLE iceberg_tb(col1 INT,col2 STRING) USING ICEBERG PARTITIONED BY (bucket(10,col2));
INSERT INTO iceberg_tb VALUES( ... );
ALTER TABLE iceberg_tb DROP PARTITION FIELD bucket(10,col2);
ALTER TABLE iceberg_tb DROP COLUMNS col2 STRING;
Link: https://github.com/apache/iceberg/pull/10755
*/
LOG.warn("Iceberg TableScanUtil.splitFiles throw NullPointerException. Cause : ", e);
throw new NotSupportedException("Unable to read Iceberg table with dropped old partition column.");
}
CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> {
List<String> partitionValues = new ArrayList<>();
if (isPartitionedTable) {
StructLike structLike = splitTask.file().partition();
List<PartitionField> fields = splitTask.spec().fields();
Types.StructType structType = icebergTable.schema().asStruct();

// set partitionValue for this IcebergSplit
for (int i = 0; i < structLike.size(); i++) {
Object obj = structLike.get(i, Object.class);
String value = String.valueOf(obj);
PartitionField partitionField = fields.get(i);
if (partitionField.transform().isIdentity()) {
Type type = structType.fieldType(partitionField.name());
if (type != null && type.typeId().equals(Type.TypeID.DATE)) {
// iceberg use integer to store date,
// we need transform it to string
value = DateTimeUtil.daysToIsoDate((Integer) obj);
}
}
partitionValues.add(value);
}

// Counts the number of partitions read
partitionPathSet.add(structLike.toString());
}
Expand All @@ -277,7 +270,7 @@ private List<Split> doGetSplits(int numBackends) throws UserException {
new String[0],
formatVersion,
source.getCatalog().getProperties(),
new ArrayList<>(),
partitionValues,
originalPath);
split.setTargetSplitSize(realFileSplitSize);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
Expand Down Expand Up @@ -310,6 +303,7 @@ private List<Split> doGetSplits(int numBackends) throws UserException {
return pushDownCountSplits;
}
}

selectedPartitionNum = partitionPathSet.size();
return splits;
}
Expand Down Expand Up @@ -371,20 +365,8 @@ public TFileFormatType getFileFormatType() throws UserException {

@Override
public List<String> getPathPartitionKeys() throws UserException {
// return icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase)
// .collect(Collectors.toList());
/**First, iceberg partition columns are based on existing fields, which will be stored in the actual data file.
* Second, iceberg partition columns support Partition transforms. In this case, the path partition key is not
* equal to the column name of the partition column, so remove this code and get all the columns you want to
* read from the file.
* Related code:
* be/src/vec/exec/scan/vfile_scanner.cpp:
* VFileScanner::_init_expr_ctxes()
* if (slot_info.is_file_slot) {
* xxxx
* }
*/
return new ArrayList<>();
return icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase)
.collect(Collectors.toList());
}

@Override
Expand Down