diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index 27e3d1c1ddad..874c22134864 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -95,7 +95,7 @@ public ColumnarRowIterator mapping( vectors = VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors); } if (indexMapping != null) { - vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors); + vectors = VectorMappingUtils.createMappedVectors(indexMapping, vectors); } return copy(vectors); } diff --git a/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java b/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java index d438bfb0ffe9..3288276a1f64 100644 --- a/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java +++ b/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java @@ -99,6 +99,10 @@ public static boolean isSystemField(String field) { return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field); } + public static boolean isKeyField(String field) { + return field.startsWith(KEY_FIELD_PREFIX); + } + // ---------------------------------------------------------------------------------------- // Structured type fields // ---------------------------------------------------------------------------------------- diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java index 8b01e644de57..02b011a2f1cf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java @@ -97,8 +97,7 @@ public static ColumnVector createFixedVector( return dataType.accept(visitor); } - public static ColumnVector[] createIndexMappedVectors( - int[] indexMapping, ColumnVector[] vectors) { + public static ColumnVector[] createMappedVectors(int[] indexMapping, ColumnVector[] vectors) { ColumnVector[] newVectors = new ColumnVector[indexMapping.length]; for (int i = 0; i < indexMapping.length; i++) { int realIndex = indexMapping[i]; diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java index c5fac9c880db..571a0d7189d6 100644 --- a/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java @@ -82,7 +82,7 @@ public void testCreateIndexMappedVectors() { int[] mapping = new int[] {0, 2, 1, 3, 2, 3, 1, 0, 4}; ColumnVector[] newColumnVectors = - VectorMappingUtils.createIndexMappedVectors(mapping, columnVectors); + VectorMappingUtils.createMappedVectors(mapping, columnVectors); for (int i = 0; i < mapping.length; i++) { Assertions.assertThat(newColumnVectors[i]).isEqualTo(columnVectors[mapping[i]]); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java index d2559fe6240b..16fad55a49a2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java @@ -76,6 +76,7 @@ public FileRecordIterator readBatch() throws IOException { PartitionSettedRow.from(partitionInfo); iterator = iterator.transform(partitionSettedRow::replaceRow); } + if (indexMapping != null) { final ProjectedRow projectedRow = ProjectedRow.from(indexMapping); iterator = iterator.transform(projectedRow::replaceRow); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 037622f95f1e..58ef924df178 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -26,6 +26,7 @@ import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.SchemaEvolutionUtil; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -35,17 +36,25 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.function.Function; import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; +import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START; /** Class with index mapping and bulk format. */ public class BulkFormatMapping { + // Index mapping from data schema fields to table schema fields, this is used to realize paimon + // schema evolution. And it combines trimeedKeyMapping, which maps key fields to the value + // fields @Nullable private final int[] indexMapping; + // help indexMapping to cast different data type @Nullable private final CastFieldGetter[] castMapping; + // partition fields mapping, add partition fields to the read fields @Nullable private final Pair partitionPair; private final FormatReaderFactory bulkFormat; private final TableSchema dataSchema; @@ -54,11 +63,12 @@ public class BulkFormatMapping { public BulkFormatMapping( @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, + @Nullable int[] trimmedKeyMapping, @Nullable Pair partitionPair, FormatReaderFactory bulkFormat, TableSchema dataSchema, List dataFilters) { - this.indexMapping = indexMapping; + this.indexMapping = combine(indexMapping, trimmedKeyMapping); this.castMapping = castMapping; this.bulkFormat = bulkFormat; this.partitionPair = partitionPair; @@ -66,6 +76,26 @@ public BulkFormatMapping( this.dataFilters = dataFilters; } + private int[] combine(@Nullable int[] indexMapping, @Nullable int[] trimmedKeyMapping) { + if (indexMapping == null) { + return trimmedKeyMapping; + } + if (trimmedKeyMapping == null) { + return indexMapping; + } + + int[] combined = new int[indexMapping.length]; + + for (int i = 0; i < indexMapping.length; i++) { + if (indexMapping[i] < 0) { + combined[i] = indexMapping[i]; + } else { + combined[i] = trimmedKeyMapping[indexMapping[i]]; + } + } + return combined; + } + @Nullable public int[] getIndexMapping() { return indexMapping; @@ -112,24 +142,46 @@ public BulkFormatMappingBuilder( this.filters = filters; } + /** + * There are three steps here to build BulkFormatMapping: + * + *

1. Calculate the readDataFields, which is what we intend to read from the data schema. + * Meanwhile, generate the indexCastMapping, which is used to map the index of the + * readDataFields to the index of the data schema. + * + *

2. Calculate the mapping to trim _KEY_ fields. For example: we want _KEY_a, _KEY_b, + * _FIELD_SEQUENCE, _ROW_KIND, a, b, c, d, e, f, g from the data, but actually we don't need + * to read _KEY_a and a, _KEY_b and b the same time, so we need to trim them. So we mapping + * it: read before: _KEY_a, _KEY_b, _FIELD_SEQUENCE, _ROW_KIND, a, b, c, d, e, f, g read + * after: a, b, _FIELD_SEQUENCE, _ROW_KIND, c, d, e, f, g and the mapping is + * [0,1,2,3,0,1,4,5,6,7,8], it converts the [read after] columns to [read before] columns. + * + *

3. We want read much fewer fields than readDataFields, so we kick out the partition + * fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce + * the real read fields and tell us how to map it back. + */ public BulkFormatMapping build( String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) { - List readDataFields = readDataFields(dataSchema); - + // extract the whole data fields in logic. + List allDataFields = fieldsExtractor.apply(dataSchema); + List readDataFields = readDataFields(allDataFields); // build index cast mapping IndexCastMapping indexCastMapping = SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields); + // map from key fields reading to value fields reading + Pair trimmedKeyPair = trimKeyFields(readDataFields, allDataFields); + // build partition mapping and filter partition fields Pair, List> partitionMappingAndFieldsWithoutPartitionPair = - PartitionUtils.constructPartitionMapping(dataSchema, readDataFields); + PartitionUtils.constructPartitionMapping( + dataSchema, trimmedKeyPair.getRight().getFields()); Pair partitionMapping = partitionMappingAndFieldsWithoutPartitionPair.getLeft(); - // build read row type - RowType readDataRowType = + RowType readRowType = new RowType(partitionMappingAndFieldsWithoutPartitionPair.getRight()); // build read filters @@ -138,18 +190,55 @@ public BulkFormatMapping build( return new BulkFormatMapping( indexCastMapping.getIndexMapping(), indexCastMapping.getCastMapping(), + trimmedKeyPair.getLeft(), partitionMapping, formatDiscover .discover(formatIdentifier) - .createReaderFactory(readDataRowType, readFilters), + .createReaderFactory(readRowType, readFilters), dataSchema, readFilters); } - private List readDataFields(TableSchema dataSchema) { - List dataFields = fieldsExtractor.apply(dataSchema); + static Pair trimKeyFields( + List fieldsWithoutPartition, List fields) { + int[] map = new int[fieldsWithoutPartition.size()]; + List trimmedFields = new ArrayList<>(); + Map fieldMap = new HashMap<>(); + Map positionMap = new HashMap<>(); + + for (DataField field : fields) { + fieldMap.put(field.id(), field); + } + + for (int i = 0; i < fieldsWithoutPartition.size(); i++) { + DataField field = fieldsWithoutPartition.get(i); + boolean keyField = SpecialFields.isKeyField(field.name()); + int id = keyField ? field.id() - KEY_FIELD_ID_START : field.id(); + // field in data schema + DataField f = fieldMap.get(id); + + if (f != null) { + if (positionMap.containsKey(id)) { + map[i] = positionMap.get(id); + } else { + map[i] = positionMap.computeIfAbsent(id, k -> trimmedFields.size()); + // If the target field is not key field, we remain what it is, because it + // may be projected. Example: the target field is a row type, but only read + // the few fields in it. If we simply trimmedFields.add(f), we will read + // more fields than we need. + trimmedFields.add(keyField ? f : field); + } + } else { + throw new RuntimeException("Can't find field with id: " + id + " in fields."); + } + } + + return Pair.of(map, new RowType(trimmedFields)); + } + + private List readDataFields(List allDataFields) { List readDataFields = new ArrayList<>(); - for (DataField dataField : dataFields) { + for (DataField dataField : allDataFields) { readTableFields.stream() .filter(f -> f.id() == dataField.id()) .findFirst() diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java index 9384eb88824d..1be5993fb0d0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java @@ -452,7 +452,7 @@ public void testIdentifierAfterFullCompaction() throws Exception { containSameIdentifyEntryFile(fullCompacted, entryIdentifierExpected); } - @RepeatedTest(1000) + @RepeatedTest(10) public void testRandomFullCompaction() throws Exception { List input = new ArrayList<>(); Set manifestEntrySet = new HashSet<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java new file mode 100644 index 000000000000..4d5d6e32e85d --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.schema.IndexCastMapping; +import org.apache.paimon.schema.SchemaEvolutionUtil; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +/** Test for {@link BulkFormatMapping.BulkFormatMappingBuilder}. */ +public class BulkFormatMappingTest { + + @Test + public void testTrimKeyFields() { + List keyFields = new ArrayList<>(); + List allFields = new ArrayList<>(); + List testFields = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + keyFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + i, + SpecialFields.KEY_FIELD_PREFIX + i, + DataTypes.STRING())); + } + + allFields.addAll(keyFields); + for (int i = 0; i < 20; i++) { + allFields.add(new DataField(i, String.valueOf(i), DataTypes.STRING())); + } + + testFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 1, + SpecialFields.KEY_FIELD_PREFIX + 1, + DataTypes.STRING())); + testFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 3, + SpecialFields.KEY_FIELD_PREFIX + 3, + DataTypes.STRING())); + testFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 5, + SpecialFields.KEY_FIELD_PREFIX + 5, + DataTypes.STRING())); + testFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 7, + SpecialFields.KEY_FIELD_PREFIX + 7, + DataTypes.STRING())); + testFields.add(new DataField(3, String.valueOf(3), DataTypes.STRING())); + testFields.add(new DataField(4, String.valueOf(4), DataTypes.STRING())); + testFields.add(new DataField(5, String.valueOf(5), DataTypes.STRING())); + testFields.add(new DataField(1, String.valueOf(1), DataTypes.STRING())); + testFields.add(new DataField(6, String.valueOf(6), DataTypes.STRING())); + + Pair res = + BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields(testFields, allFields); + + Assertions.assertThat(res.getKey()).containsExactly(0, 1, 2, 3, 1, 4, 2, 0, 5); + + List fields = res.getRight().getFields(); + Assertions.assertThat(fields.size()).isEqualTo(6); + Assertions.assertThat(fields.get(0).id()).isEqualTo(1); + Assertions.assertThat(fields.get(1).id()).isEqualTo(3); + Assertions.assertThat(fields.get(2).id()).isEqualTo(5); + Assertions.assertThat(fields.get(3).id()).isEqualTo(7); + Assertions.assertThat(fields.get(4).id()).isEqualTo(4); + Assertions.assertThat(fields.get(5).id()).isEqualTo(6); + } + + @Test + public void testTrimKeyWithIndexMapping() { + List readTableFields = new ArrayList<>(); + List readDataFields = new ArrayList<>(); + + readTableFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 1, + SpecialFields.KEY_FIELD_PREFIX + "a", + DataTypes.STRING())); + readTableFields.add(new DataField(0, "0", DataTypes.STRING())); + readTableFields.add(new DataField(1, "a", DataTypes.STRING())); + readTableFields.add(new DataField(2, "2", DataTypes.STRING())); + readTableFields.add(new DataField(3, "3", DataTypes.STRING())); + + readDataFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 1, + SpecialFields.KEY_FIELD_PREFIX + "a", + DataTypes.STRING())); + readDataFields.add(new DataField(0, "0", DataTypes.STRING())); + readDataFields.add(new DataField(1, "a", DataTypes.STRING())); + readDataFields.add(new DataField(3, "3", DataTypes.STRING())); + + // build index cast mapping + IndexCastMapping indexCastMapping = + SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields); + + // map from key fields reading to value fields reading + Pair trimmedKeyPair = + BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields( + readDataFields, readDataFields); + + BulkFormatMapping bulkFormatMapping = + new BulkFormatMapping( + indexCastMapping.getIndexMapping(), + indexCastMapping.getCastMapping(), + trimmedKeyPair.getLeft(), + null, + null, + null, + null); + + Assertions.assertThat(bulkFormatMapping.getIndexMapping()).containsExactly(0, 1, 0, -1, 2); + List trimmed = trimmedKeyPair.getRight().getFields(); + Assertions.assertThat(trimmed.get(0).id()).isEqualTo(1); + Assertions.assertThat(trimmed.get(1).id()).isEqualTo(0); + Assertions.assertThat(trimmed.get(2).id()).isEqualTo(3); + Assertions.assertThat(trimmed.size()).isEqualTo(3); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 85679e5fd30a..95aa9e68bbf3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -69,7 +69,7 @@ public class TestChangelogDataReadWrite { private static final RowType KEY_TYPE = new RowType(singletonList(new DataField(0, "k", new BigIntType()))); private static final RowType VALUE_TYPE = - new RowType(singletonList(new DataField(0, "v", new BigIntType()))); + new RowType(singletonList(new DataField(1, "v", new BigIntType()))); private static final RowType PARTITION_TYPE = new RowType(singletonList(new DataField(0, "p", new IntType()))); private static final Comparator COMPARATOR = @@ -87,7 +87,7 @@ public List keyFields(TableSchema schema) { @Override public List valueFields(TableSchema schema) { return Collections.singletonList( - new DataField(0, "v", new org.apache.paimon.types.BigIntType(false))); + new DataField(1, "v", new org.apache.paimon.types.BigIntType(false))); } };