From 4d2ceed5ff9f47add0b5d86e4e395137bb9d9e3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 6 Dec 2024 13:34:14 +0800 Subject: [PATCH 01/17] [core] Trim key field in reading, map it to value field --- .../data/columnar/ColumnarRowIterator.java | 11 ++- .../apache/paimon/table/SpecialFields.java | 6 ++ .../paimon/utils/VectorMappingUtils.java | 3 +- .../paimon/io/DataFileRecordReader.java | 13 ++- .../paimon/io/KeyValueFileReaderFactory.java | 3 +- .../paimon/operation/RawFileSplitRead.java | 3 +- .../paimon/utils/BulkFormatMapping.java | 84 +++++++++++++++++-- 7 files changed, 105 insertions(+), 18 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index 27e3d1c1ddad..a0d7d876658e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -87,15 +87,20 @@ public ColumnarRowIterator copy(ColumnVector[] vectors) { } public ColumnarRowIterator mapping( - @Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) { - if (partitionInfo != null || indexMapping != null) { + @Nullable int[] trimmedKeyMapping, + @Nullable PartitionInfo partitionInfo, + @Nullable int[] indexMapping) { + if (trimmedKeyMapping != null || partitionInfo != null || indexMapping != null) { VectorizedColumnBatch vectorizedColumnBatch = row.batch(); ColumnVector[] vectors = vectorizedColumnBatch.columns; + if (trimmedKeyMapping != null) { + vectors = VectorMappingUtils.createMappedVectors(trimmedKeyMapping, vectors); + } if (partitionInfo != null) { vectors = VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors); } if (indexMapping != null) { - vectors = VectorMappingUtils.createIndexMappedVectors(indexMapping, vectors); + vectors = VectorMappingUtils.createMappedVectors(indexMapping, vectors); } return copy(vectors); } diff --git a/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java b/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java index d438bfb0ffe9..68f7554ab406 100644 --- a/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java +++ b/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java @@ -72,6 +72,8 @@ public class SpecialFields { public static final String KEY_FIELD_PREFIX = "_KEY_"; public static final int KEY_FIELD_ID_START = SYSTEM_FIELD_ID_START; + // reserve 1000 for other system fields + public static final int KEY_FIELD_ID_END = Integer.MAX_VALUE - 1_000; public static final DataField SEQUENCE_NUMBER = new DataField(Integer.MAX_VALUE - 1, "_SEQUENCE_NUMBER", DataTypes.BIGINT().notNull()); @@ -99,6 +101,10 @@ public static boolean isSystemField(String field) { return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field); } + public static boolean isKeyField(int id) { + return id >= KEY_FIELD_ID_START && id < KEY_FIELD_ID_END; + } + // ---------------------------------------------------------------------------------------- // Structured type fields // ---------------------------------------------------------------------------------------- diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java index 8b01e644de57..02b011a2f1cf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/VectorMappingUtils.java @@ -97,8 +97,7 @@ public static ColumnVector createFixedVector( return dataType.accept(visitor); } - public static ColumnVector[] createIndexMappedVectors( - int[] indexMapping, ColumnVector[] vectors) { + public static ColumnVector[] createMappedVectors(int[] indexMapping, ColumnVector[] vectors) { ColumnVector[] newVectors = new ColumnVector[indexMapping.length]; for (int i = 0; i < indexMapping.length; i++) { int realIndex = indexMapping[i]; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java index d2559fe6240b..8548ec2bc8f5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java @@ -41,13 +41,15 @@ public class DataFileRecordReader implements FileRecordReader { @Nullable private final int[] indexMapping; @Nullable private final PartitionInfo partitionInfo; @Nullable private final CastFieldGetter[] castMapping; + @Nullable private final int[] trimmedKeyMapping; public DataFileRecordReader( FormatReaderFactory readerFactory, FormatReaderFactory.Context context, @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, - @Nullable PartitionInfo partitionInfo) + @Nullable PartitionInfo partitionInfo, + @Nullable int[] trimmedKeyMapping) throws IOException { try { this.reader = readerFactory.createReader(context); @@ -58,6 +60,7 @@ public DataFileRecordReader( this.indexMapping = indexMapping; this.partitionInfo = partitionInfo; this.castMapping = castMapping; + this.trimmedKeyMapping = trimmedKeyMapping; } @Nullable @@ -69,8 +72,14 @@ public FileRecordIterator readBatch() throws IOException { } if (iterator instanceof ColumnarRowIterator) { - iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, indexMapping); + iterator = + ((ColumnarRowIterator) iterator) + .mapping(trimmedKeyMapping, partitionInfo, indexMapping); } else { + if (trimmedKeyMapping != null) { + final ProjectedRow projectedRow = ProjectedRow.from(trimmedKeyMapping); + iterator = iterator.transform(projectedRow::replaceRow); + } if (partitionInfo != null) { final PartitionSettedRow partitionSettedRow = PartitionSettedRow.from(partitionInfo); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 7d3acd729c55..26b272d772e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -144,7 +144,8 @@ private FileRecordReader createRecordReader( fileIO, filePath, fileSize, orcPoolSize), bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition), + bulkFormatMapping.getTrimmedKeyMapping()); Optional deletionVector = dvFactory.create(fileName); if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 46977457c4be..6cc3006175b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -221,7 +221,8 @@ private FileRecordReader createFileReader( formatReaderContext, bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition), + bulkFormatMapping.getTrimmedKeyMapping()); if (fileIndexResult instanceof BitmapIndexResult) { fileRecordReader = diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 037622f95f1e..ecf509462179 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -26,6 +26,7 @@ import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.SchemaEvolutionUtil; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -35,11 +36,15 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; +import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START; /** Class with index mapping and bulk format. */ public class BulkFormatMapping { @@ -47,6 +52,7 @@ public class BulkFormatMapping { @Nullable private final int[] indexMapping; @Nullable private final CastFieldGetter[] castMapping; @Nullable private final Pair partitionPair; + @Nullable private final int[] trimmedKeyMapping; private final FormatReaderFactory bulkFormat; private final TableSchema dataSchema; private final List dataFilters; @@ -55,6 +61,7 @@ public BulkFormatMapping( @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, @Nullable Pair partitionPair, + @Nullable int[] trimmedKeyMapping, FormatReaderFactory bulkFormat, TableSchema dataSchema, List dataFilters) { @@ -62,6 +69,7 @@ public BulkFormatMapping( this.castMapping = castMapping; this.bulkFormat = bulkFormat; this.partitionPair = partitionPair; + this.trimmedKeyMapping = trimmedKeyMapping; this.dataSchema = dataSchema; this.dataFilters = dataFilters; } @@ -81,6 +89,11 @@ public Pair getPartitionPair() { return partitionPair; } + @Nullable + public int[] getTrimmedKeyMapping() { + return trimmedKeyMapping; + } + public FormatReaderFactory getReaderFactory() { return bulkFormat; } @@ -112,11 +125,27 @@ public BulkFormatMappingBuilder( this.filters = filters; } + /** + * There are three steps here to build BulkFormatMapping: + * + *

1. Calculate the readDataFields, which is what we intend to read from the data schema. + * Meanwhile, generate the indexCastMapping, which is used to map the index of the + * readDataFields to the index of the data schema. + * + *

2. We want read much fewer fields than readDataFields, so we kick out the partition + * fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce + * the real read fields and tell us how to map it back. + * + *

3. We still want read fewer fields, so we combine the _KEY_xxx fields to xxx fields. + * They are always the same, we just need to get once. We generate trimmedKeyPair to reduce + * the real read fields again, also it tells us how to map it back. + */ public BulkFormatMapping build( String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) { - List readDataFields = readDataFields(dataSchema); - + // extract the whole data fields in logic. + List allDataFields = fieldsExtractor.apply(dataSchema); + List readDataFields = readDataFields(allDataFields); // build index cast mapping IndexCastMapping indexCastMapping = SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields); @@ -128,9 +157,12 @@ public BulkFormatMapping build( Pair partitionMapping = partitionMappingAndFieldsWithoutPartitionPair.getLeft(); - // build read row type - RowType readDataRowType = - new RowType(partitionMappingAndFieldsWithoutPartitionPair.getRight()); + List fieldsWithoutPartition = + partitionMappingAndFieldsWithoutPartitionPair.getRight(); + + // map from key fields reading to value fields reading + Pair trimmedKeyPair = + trimKeyFields(fieldsWithoutPartition, allDataFields); // build read filters List readFilters = readFilters(filters, tableSchema, dataSchema); @@ -139,17 +171,51 @@ public BulkFormatMapping build( indexCastMapping.getIndexMapping(), indexCastMapping.getCastMapping(), partitionMapping, + trimmedKeyPair.getLeft(), formatDiscover .discover(formatIdentifier) - .createReaderFactory(readDataRowType, readFilters), + .createReaderFactory(trimmedKeyPair.getRight(), readFilters), dataSchema, readFilters); } - private List readDataFields(TableSchema dataSchema) { - List dataFields = fieldsExtractor.apply(dataSchema); + private Pair trimKeyFields( + List fieldsWithoutPartition, List fields) { + int[] map = new int[fieldsWithoutPartition.size()]; + List trimmedFields = new ArrayList<>(); + Map fieldMap = new HashMap<>(); + Map positionMap = new HashMap<>(); + + for (DataField field : fields) { + fieldMap.put(field.id(), field); + } + + AtomicInteger index = new AtomicInteger(); + for (int i = 0; i < fieldsWithoutPartition.size(); i++) { + DataField field = fieldsWithoutPartition.get(i); + boolean keyField = SpecialFields.isKeyField(field.id()); + int id = keyField ? field.id() - KEY_FIELD_ID_START : field.id(); + // field in data schema + DataField f = fieldMap.get(id); + + if (f != null) { + if (positionMap.containsKey(id)) { + map[i] = positionMap.get(id); + } else { + trimmedFields.add(keyField ? f : field); + map[i] = positionMap.computeIfAbsent(id, k -> index.getAndIncrement()); + } + } else { + throw new RuntimeException("Can't find field with id: " + id + " in fields."); + } + } + + return Pair.of(map, new RowType(trimmedFields)); + } + + private List readDataFields(List allDataFields) { List readDataFields = new ArrayList<>(); - for (DataField dataField : dataFields) { + for (DataField dataField : allDataFields) { readTableFields.stream() .filter(f -> f.id() == dataField.id()) .findFirst() From 86d0c0185ee5ac23bbecbf31965cb7519aa9df2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 6 Dec 2024 13:35:11 +0800 Subject: [PATCH 02/17] fix comment --- .../java/org/apache/paimon/utils/VectorMappingUtilsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java index c5fac9c880db..571a0d7189d6 100644 --- a/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/utils/VectorMappingUtilsTest.java @@ -82,7 +82,7 @@ public void testCreateIndexMappedVectors() { int[] mapping = new int[] {0, 2, 1, 3, 2, 3, 1, 0, 4}; ColumnVector[] newColumnVectors = - VectorMappingUtils.createIndexMappedVectors(mapping, columnVectors); + VectorMappingUtils.createMappedVectors(mapping, columnVectors); for (int i = 0; i < mapping.length; i++) { Assertions.assertThat(newColumnVectors[i]).isEqualTo(columnVectors[mapping[i]]); From 561138596cfd37c02eb44a5555ab1dd7dec1dc20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 6 Dec 2024 13:47:00 +0800 Subject: [PATCH 03/17] [core] Trim key field in reading, map it to value field --- .../java/org/apache/paimon/manifest/ManifestFileMetaTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java index 9384eb88824d..1be5993fb0d0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java @@ -452,7 +452,7 @@ public void testIdentifierAfterFullCompaction() throws Exception { containSameIdentifyEntryFile(fullCompacted, entryIdentifierExpected); } - @RepeatedTest(1000) + @RepeatedTest(10) public void testRandomFullCompaction() throws Exception { List input = new ArrayList<>(); Set manifestEntrySet = new HashSet<>(); From 62736eb23bd98c49ecfd7d552374c1c8fbc23583 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 6 Dec 2024 14:24:39 +0800 Subject: [PATCH 04/17] temp --- .../paimon/flink/source/TestChangelogDataReadWrite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 85679e5fd30a..95aa9e68bbf3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -69,7 +69,7 @@ public class TestChangelogDataReadWrite { private static final RowType KEY_TYPE = new RowType(singletonList(new DataField(0, "k", new BigIntType()))); private static final RowType VALUE_TYPE = - new RowType(singletonList(new DataField(0, "v", new BigIntType()))); + new RowType(singletonList(new DataField(1, "v", new BigIntType()))); private static final RowType PARTITION_TYPE = new RowType(singletonList(new DataField(0, "p", new IntType()))); private static final Comparator COMPARATOR = @@ -87,7 +87,7 @@ public List keyFields(TableSchema schema) { @Override public List valueFields(TableSchema schema) { return Collections.singletonList( - new DataField(0, "v", new org.apache.paimon.types.BigIntType(false))); + new DataField(1, "v", new org.apache.paimon.types.BigIntType(false))); } }; From 5ad9d38ff9475f698c36b1bd4a909666406f0946 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Fri, 6 Dec 2024 14:37:16 +0800 Subject: [PATCH 05/17] temp --- .../paimon/utils/BulkFormatMapping.java | 2 +- .../paimon/utils/BulkFormatMappingTest.java | 95 +++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index ecf509462179..522886477f01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -179,7 +179,7 @@ public BulkFormatMapping build( readFilters); } - private Pair trimKeyFields( + static Pair trimKeyFields( List fieldsWithoutPartition, List fields) { int[] map = new int[fieldsWithoutPartition.size()]; List trimmedFields = new ArrayList<>(); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java new file mode 100644 index 000000000000..0c86d720f7f3 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +/** Test for {@link BulkFormatMapping.BulkFormatMappingBuilder}. */ +public class BulkFormatMappingTest { + + @Test + public void tesTtrimKeyFields() { + + List keyFields = new ArrayList<>(); + List allFields = new ArrayList<>(); + List testFields = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + keyFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + i, + SpecialFields.KEY_FIELD_PREFIX + i, + DataTypes.STRING())); + } + + allFields.addAll(keyFields); + for (int i = 0; i < 20; i++) { + allFields.add(new DataField(i, String.valueOf(i), DataTypes.STRING())); + } + + testFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 1, + SpecialFields.KEY_FIELD_PREFIX + 1, + DataTypes.STRING())); + testFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 3, + SpecialFields.KEY_FIELD_PREFIX + 3, + DataTypes.STRING())); + testFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 5, + SpecialFields.KEY_FIELD_PREFIX + 5, + DataTypes.STRING())); + testFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 7, + SpecialFields.KEY_FIELD_PREFIX + 7, + DataTypes.STRING())); + testFields.add(new DataField(3, String.valueOf(3), DataTypes.STRING())); + testFields.add(new DataField(4, String.valueOf(4), DataTypes.STRING())); + testFields.add(new DataField(5, String.valueOf(5), DataTypes.STRING())); + testFields.add(new DataField(1, String.valueOf(1), DataTypes.STRING())); + testFields.add(new DataField(6, String.valueOf(6), DataTypes.STRING())); + + Pair res = + BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields(testFields, allFields); + + Assertions.assertThat(res.getKey()).containsExactly(0, 1, 2, 3, 1, 4, 2, 0, 5); + + List fields = res.getRight().getFields(); + Assertions.assertThat(fields.size()).isEqualTo(6); + Assertions.assertThat(fields.get(0).id()).isEqualTo(1); + Assertions.assertThat(fields.get(1).id()).isEqualTo(3); + Assertions.assertThat(fields.get(2).id()).isEqualTo(5); + Assertions.assertThat(fields.get(3).id()).isEqualTo(7); + Assertions.assertThat(fields.get(4).id()).isEqualTo(4); + Assertions.assertThat(fields.get(5).id()).isEqualTo(6); + } +} From 0a68dc08e4a08f44c6bd7fa2606f0d524df844be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Mon, 9 Dec 2024 15:33:40 +0800 Subject: [PATCH 06/17] fix comment --- .../main/java/org/apache/paimon/table/SpecialFields.java | 6 ++---- .../java/org/apache/paimon/utils/BulkFormatMapping.java | 7 ++++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java b/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java index 68f7554ab406..3288276a1f64 100644 --- a/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java +++ b/paimon-common/src/main/java/org/apache/paimon/table/SpecialFields.java @@ -72,8 +72,6 @@ public class SpecialFields { public static final String KEY_FIELD_PREFIX = "_KEY_"; public static final int KEY_FIELD_ID_START = SYSTEM_FIELD_ID_START; - // reserve 1000 for other system fields - public static final int KEY_FIELD_ID_END = Integer.MAX_VALUE - 1_000; public static final DataField SEQUENCE_NUMBER = new DataField(Integer.MAX_VALUE - 1, "_SEQUENCE_NUMBER", DataTypes.BIGINT().notNull()); @@ -101,8 +99,8 @@ public static boolean isSystemField(String field) { return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field); } - public static boolean isKeyField(int id) { - return id >= KEY_FIELD_ID_START && id < KEY_FIELD_ID_END; + public static boolean isKeyField(String field) { + return field.startsWith(KEY_FIELD_PREFIX); } // ---------------------------------------------------------------------------------------- diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 522886477f01..d5d28ea4519f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -49,9 +49,14 @@ /** Class with index mapping and bulk format. */ public class BulkFormatMapping { + // index mapping from data schema fields to table schema fields, this is used to realize paimon + // schema evolution @Nullable private final int[] indexMapping; + // help indexMapping to cast defferent data type @Nullable private final CastFieldGetter[] castMapping; + // partition fields mapping, add partition fields to the read fields @Nullable private final Pair partitionPair; + // key fields mapping, add key fields to the read fields @Nullable private final int[] trimmedKeyMapping; private final FormatReaderFactory bulkFormat; private final TableSchema dataSchema; @@ -193,7 +198,7 @@ static Pair trimKeyFields( AtomicInteger index = new AtomicInteger(); for (int i = 0; i < fieldsWithoutPartition.size(); i++) { DataField field = fieldsWithoutPartition.get(i); - boolean keyField = SpecialFields.isKeyField(field.id()); + boolean keyField = SpecialFields.isKeyField(field.name()); int id = keyField ? field.id() - KEY_FIELD_ID_START : field.id(); // field in data schema DataField f = fieldMap.get(id); From 3f102a319cd5ee2557652c2a98555aecd9b814c4 Mon Sep 17 00:00:00 2001 From: YeJunHao <41894543+leaves12138@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:46:39 +0800 Subject: [PATCH 07/17] Update paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java Co-authored-by: tsreaper --- .../java/org/apache/paimon/utils/BulkFormatMappingTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java index 0c86d720f7f3..dc870cd39caa 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java @@ -33,7 +33,7 @@ public class BulkFormatMappingTest { @Test - public void tesTtrimKeyFields() { + public void testTrimKeyFields() { List keyFields = new ArrayList<>(); List allFields = new ArrayList<>(); From a90641475aa504699c62affa5c1cb5c298db2426 Mon Sep 17 00:00:00 2001 From: YeJunHao <41894543+leaves12138@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:46:46 +0800 Subject: [PATCH 08/17] Update paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java Co-authored-by: tsreaper --- .../main/java/org/apache/paimon/utils/BulkFormatMapping.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index d5d28ea4519f..831c6ddbcc91 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -207,8 +207,8 @@ static Pair trimKeyFields( if (positionMap.containsKey(id)) { map[i] = positionMap.get(id); } else { - trimmedFields.add(keyField ? f : field); - map[i] = positionMap.computeIfAbsent(id, k -> index.getAndIncrement()); + map[i] = positionMap.put(id, trimmedFields.size()); + trimmedFields.add(f); } } else { throw new RuntimeException("Can't find field with id: " + id + " in fields."); From 8cee400a31f7a0f94f4326c66b56e844f1c9a826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Mon, 9 Dec 2024 17:22:02 +0800 Subject: [PATCH 09/17] fix comment --- .../data/columnar/ColumnarRowIterator.java | 13 ++-- .../paimon/io/DataFileRecordReader.java | 24 +++---- .../paimon/io/KeyValueFileReaderFactory.java | 5 +- .../paimon/operation/RawFileSplitRead.java | 5 +- .../paimon/utils/BulkFormatMapping.java | 62 ++++++++++++------- 5 files changed, 57 insertions(+), 52 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index a0d7d876658e..cfeb06669679 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -87,20 +87,15 @@ public ColumnarRowIterator copy(ColumnVector[] vectors) { } public ColumnarRowIterator mapping( - @Nullable int[] trimmedKeyMapping, - @Nullable PartitionInfo partitionInfo, - @Nullable int[] indexMapping) { - if (trimmedKeyMapping != null || partitionInfo != null || indexMapping != null) { + @Nullable PartitionInfo partitionInfo, @Nullable int[] columnMapping) { + if (partitionInfo != null || columnMapping != null) { VectorizedColumnBatch vectorizedColumnBatch = row.batch(); ColumnVector[] vectors = vectorizedColumnBatch.columns; - if (trimmedKeyMapping != null) { - vectors = VectorMappingUtils.createMappedVectors(trimmedKeyMapping, vectors); - } if (partitionInfo != null) { vectors = VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors); } - if (indexMapping != null) { - vectors = VectorMappingUtils.createMappedVectors(indexMapping, vectors); + if (columnMapping != null) { + vectors = VectorMappingUtils.createMappedVectors(columnMapping, vectors); } return copy(vectors); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java index 8548ec2bc8f5..228605f6859b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java @@ -38,18 +38,16 @@ public class DataFileRecordReader implements FileRecordReader { private final FileRecordReader reader; - @Nullable private final int[] indexMapping; + @Nullable private final int[] columnMapping; @Nullable private final PartitionInfo partitionInfo; @Nullable private final CastFieldGetter[] castMapping; - @Nullable private final int[] trimmedKeyMapping; public DataFileRecordReader( FormatReaderFactory readerFactory, FormatReaderFactory.Context context, - @Nullable int[] indexMapping, + @Nullable int[] columnMapping, @Nullable CastFieldGetter[] castMapping, - @Nullable PartitionInfo partitionInfo, - @Nullable int[] trimmedKeyMapping) + @Nullable PartitionInfo partitionInfo) throws IOException { try { this.reader = readerFactory.createReader(context); @@ -57,10 +55,9 @@ public DataFileRecordReader( FileUtils.checkExists(context.fileIO(), context.filePath()); throw e; } - this.indexMapping = indexMapping; + this.columnMapping = columnMapping; this.partitionInfo = partitionInfo; this.castMapping = castMapping; - this.trimmedKeyMapping = trimmedKeyMapping; } @Nullable @@ -72,21 +69,16 @@ public FileRecordIterator readBatch() throws IOException { } if (iterator instanceof ColumnarRowIterator) { - iterator = - ((ColumnarRowIterator) iterator) - .mapping(trimmedKeyMapping, partitionInfo, indexMapping); + iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, columnMapping); } else { - if (trimmedKeyMapping != null) { - final ProjectedRow projectedRow = ProjectedRow.from(trimmedKeyMapping); - iterator = iterator.transform(projectedRow::replaceRow); - } if (partitionInfo != null) { final PartitionSettedRow partitionSettedRow = PartitionSettedRow.from(partitionInfo); iterator = iterator.transform(partitionSettedRow::replaceRow); } - if (indexMapping != null) { - final ProjectedRow projectedRow = ProjectedRow.from(indexMapping); + + if (columnMapping != null) { + final ProjectedRow projectedRow = ProjectedRow.from(columnMapping); iterator = iterator.transform(projectedRow::replaceRow); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 26b272d772e7..17d94f968fb2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -142,10 +142,9 @@ private FileRecordReader createRecordReader( ? new FormatReaderContext(fileIO, filePath, fileSize) : new OrcFormatReaderContext( fileIO, filePath, fileSize, orcPoolSize), - bulkFormatMapping.getIndexMapping(), + bulkFormatMapping.getColumnMapping(), bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition), - bulkFormatMapping.getTrimmedKeyMapping()); + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); Optional deletionVector = dvFactory.create(fileName); if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 6cc3006175b6..c3cf16275a61 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -219,10 +219,9 @@ private FileRecordReader createFileReader( new DataFileRecordReader( bulkFormatMapping.getReaderFactory(), formatReaderContext, - bulkFormatMapping.getIndexMapping(), + bulkFormatMapping.getColumnMapping(), bulkFormatMapping.getCastMapping(), - PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition), - bulkFormatMapping.getTrimmedKeyMapping()); + PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); if (fileIndexResult instanceof BitmapIndexResult) { fileRecordReader = diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 831c6ddbcc91..41baabeb013a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -51,7 +51,7 @@ public class BulkFormatMapping { // index mapping from data schema fields to table schema fields, this is used to realize paimon // schema evolution - @Nullable private final int[] indexMapping; + @Nullable private final int[] columnMapping; // help indexMapping to cast defferent data type @Nullable private final CastFieldGetter[] castMapping; // partition fields mapping, add partition fields to the read fields @@ -65,12 +65,12 @@ public class BulkFormatMapping { public BulkFormatMapping( @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, - @Nullable Pair partitionPair, @Nullable int[] trimmedKeyMapping, + @Nullable Pair partitionPair, FormatReaderFactory bulkFormat, TableSchema dataSchema, List dataFilters) { - this.indexMapping = indexMapping; + this.columnMapping = combine(indexMapping, trimmedKeyMapping); this.castMapping = castMapping; this.bulkFormat = bulkFormat; this.partitionPair = partitionPair; @@ -79,9 +79,29 @@ public BulkFormatMapping( this.dataFilters = dataFilters; } + private int[] combine(@Nullable int[] indexMapping, @Nullable int[] trimmedKeyMapping) { + if (indexMapping == null) { + return trimmedKeyMapping; + } + if (trimmedKeyMapping == null) { + return indexMapping; + } + + int[] combined = new int[indexMapping.length]; + + for (int i = 0; i < indexMapping.length; i++) { + if (indexMapping[i] < 0) { + combined[i] = indexMapping[i]; + } else { + combined[i] = trimmedKeyMapping[indexMapping[i]]; + } + } + return combined; + } + @Nullable - public int[] getIndexMapping() { - return indexMapping; + public int[] getColumnMapping() { + return columnMapping; } @Nullable @@ -94,11 +114,11 @@ public Pair getPartitionPair() { return partitionPair; } - @Nullable - public int[] getTrimmedKeyMapping() { - return trimmedKeyMapping; - } - + // @Nullable + // public int[] getTrimmedKeyMapping() { + // return trimmedKeyMapping; + // } + // public FormatReaderFactory getReaderFactory() { return bulkFormat; } @@ -155,19 +175,19 @@ public BulkFormatMapping build( IndexCastMapping indexCastMapping = SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields); + // map from key fields reading to value fields reading + Pair trimmedKeyPair = trimKeyFields(readDataFields, allDataFields); + // build partition mapping and filter partition fields Pair, List> partitionMappingAndFieldsWithoutPartitionPair = - PartitionUtils.constructPartitionMapping(dataSchema, readDataFields); + PartitionUtils.constructPartitionMapping( + dataSchema, trimmedKeyPair.getRight().getFields()); Pair partitionMapping = partitionMappingAndFieldsWithoutPartitionPair.getLeft(); - List fieldsWithoutPartition = - partitionMappingAndFieldsWithoutPartitionPair.getRight(); - - // map from key fields reading to value fields reading - Pair trimmedKeyPair = - trimKeyFields(fieldsWithoutPartition, allDataFields); + RowType readRowType = + new RowType(partitionMappingAndFieldsWithoutPartitionPair.getRight()); // build read filters List readFilters = readFilters(filters, tableSchema, dataSchema); @@ -175,11 +195,11 @@ public BulkFormatMapping build( return new BulkFormatMapping( indexCastMapping.getIndexMapping(), indexCastMapping.getCastMapping(), - partitionMapping, trimmedKeyPair.getLeft(), + partitionMapping, formatDiscover .discover(formatIdentifier) - .createReaderFactory(trimmedKeyPair.getRight(), readFilters), + .createReaderFactory(readRowType, readFilters), dataSchema, readFilters); } @@ -207,8 +227,8 @@ static Pair trimKeyFields( if (positionMap.containsKey(id)) { map[i] = positionMap.get(id); } else { - map[i] = positionMap.put(id, trimmedFields.size()); - trimmedFields.add(f); + trimmedFields.add(keyField ? f : field); + map[i] = positionMap.computeIfAbsent(id, k -> index.getAndIncrement()); } } else { throw new RuntimeException("Can't find field with id: " + id + " in fields."); From ee959970b26758f591457a4c54de30b641e03bdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Mon, 9 Dec 2024 17:23:54 +0800 Subject: [PATCH 10/17] fix comment --- .../apache/paimon/io/DataFileRecordReader.java | 12 ++++++------ .../paimon/io/KeyValueFileReaderFactory.java | 2 +- .../paimon/operation/RawFileSplitRead.java | 2 +- .../apache/paimon/utils/BulkFormatMapping.java | 16 ++++------------ 4 files changed, 12 insertions(+), 20 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java index 228605f6859b..16fad55a49a2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java @@ -38,14 +38,14 @@ public class DataFileRecordReader implements FileRecordReader { private final FileRecordReader reader; - @Nullable private final int[] columnMapping; + @Nullable private final int[] indexMapping; @Nullable private final PartitionInfo partitionInfo; @Nullable private final CastFieldGetter[] castMapping; public DataFileRecordReader( FormatReaderFactory readerFactory, FormatReaderFactory.Context context, - @Nullable int[] columnMapping, + @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, @Nullable PartitionInfo partitionInfo) throws IOException { @@ -55,7 +55,7 @@ public DataFileRecordReader( FileUtils.checkExists(context.fileIO(), context.filePath()); throw e; } - this.columnMapping = columnMapping; + this.indexMapping = indexMapping; this.partitionInfo = partitionInfo; this.castMapping = castMapping; } @@ -69,7 +69,7 @@ public FileRecordIterator readBatch() throws IOException { } if (iterator instanceof ColumnarRowIterator) { - iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, columnMapping); + iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, indexMapping); } else { if (partitionInfo != null) { final PartitionSettedRow partitionSettedRow = @@ -77,8 +77,8 @@ public FileRecordIterator readBatch() throws IOException { iterator = iterator.transform(partitionSettedRow::replaceRow); } - if (columnMapping != null) { - final ProjectedRow projectedRow = ProjectedRow.from(columnMapping); + if (indexMapping != null) { + final ProjectedRow projectedRow = ProjectedRow.from(indexMapping); iterator = iterator.transform(projectedRow::replaceRow); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 17d94f968fb2..7d3acd729c55 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -142,7 +142,7 @@ private FileRecordReader createRecordReader( ? new FormatReaderContext(fileIO, filePath, fileSize) : new OrcFormatReaderContext( fileIO, filePath, fileSize, orcPoolSize), - bulkFormatMapping.getColumnMapping(), + bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index c3cf16275a61..46977457c4be 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -219,7 +219,7 @@ private FileRecordReader createFileReader( new DataFileRecordReader( bulkFormatMapping.getReaderFactory(), formatReaderContext, - bulkFormatMapping.getColumnMapping(), + bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition)); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 41baabeb013a..3e6c849732e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -51,13 +51,11 @@ public class BulkFormatMapping { // index mapping from data schema fields to table schema fields, this is used to realize paimon // schema evolution - @Nullable private final int[] columnMapping; + @Nullable private final int[] indexMapping; // help indexMapping to cast defferent data type @Nullable private final CastFieldGetter[] castMapping; // partition fields mapping, add partition fields to the read fields @Nullable private final Pair partitionPair; - // key fields mapping, add key fields to the read fields - @Nullable private final int[] trimmedKeyMapping; private final FormatReaderFactory bulkFormat; private final TableSchema dataSchema; private final List dataFilters; @@ -70,11 +68,10 @@ public BulkFormatMapping( FormatReaderFactory bulkFormat, TableSchema dataSchema, List dataFilters) { - this.columnMapping = combine(indexMapping, trimmedKeyMapping); + this.indexMapping = combine(indexMapping, trimmedKeyMapping); this.castMapping = castMapping; this.bulkFormat = bulkFormat; this.partitionPair = partitionPair; - this.trimmedKeyMapping = trimmedKeyMapping; this.dataSchema = dataSchema; this.dataFilters = dataFilters; } @@ -100,8 +97,8 @@ private int[] combine(@Nullable int[] indexMapping, @Nullable int[] trimmedKeyMa } @Nullable - public int[] getColumnMapping() { - return columnMapping; + public int[] getIndexMapping() { + return indexMapping; } @Nullable @@ -114,11 +111,6 @@ public Pair getPartitionPair() { return partitionPair; } - // @Nullable - // public int[] getTrimmedKeyMapping() { - // return trimmedKeyMapping; - // } - // public FormatReaderFactory getReaderFactory() { return bulkFormat; } From 742d478a982b84e013a1ab30e2c3466ead583850 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Mon, 9 Dec 2024 17:41:09 +0800 Subject: [PATCH 11/17] comment --- .../org/apache/paimon/utils/BulkFormatMapping.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 3e6c849732e8..6623b20b8e76 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -49,10 +49,11 @@ /** Class with index mapping and bulk format. */ public class BulkFormatMapping { - // index mapping from data schema fields to table schema fields, this is used to realize paimon - // schema evolution + // Index mapping from data schema fields to table schema fields, this is used to realize paimon + // schema evolution. And it combines trimeedKeyMapping, which maps key fields to the value + // fields @Nullable private final int[] indexMapping; - // help indexMapping to cast defferent data type + // help indexMapping to cast different data type @Nullable private final CastFieldGetter[] castMapping; // partition fields mapping, add partition fields to the read fields @Nullable private final Pair partitionPair; @@ -147,15 +148,12 @@ public BulkFormatMappingBuilder( * *

1. Calculate the readDataFields, which is what we intend to read from the data schema. * Meanwhile, generate the indexCastMapping, which is used to map the index of the - * readDataFields to the index of the data schema. + * readDataFields to the index of the data schema. Also, this mapping combined + * trimmedKeyMapping(whith maps the _KEY_xxx fields to xxx fields.) * *

2. We want read much fewer fields than readDataFields, so we kick out the partition * fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce * the real read fields and tell us how to map it back. - * - *

3. We still want read fewer fields, so we combine the _KEY_xxx fields to xxx fields. - * They are always the same, we just need to get once. We generate trimmedKeyPair to reduce - * the real read fields again, also it tells us how to map it back. */ public BulkFormatMapping build( String formatIdentifier, TableSchema tableSchema, TableSchema dataSchema) { From d07a2c8e8c8711c71027a04480f92461f18225cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 10:01:55 +0800 Subject: [PATCH 12/17] comment --- .../apache/paimon/data/columnar/ColumnarRowIterator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index cfeb06669679..874c22134864 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -87,15 +87,15 @@ public ColumnarRowIterator copy(ColumnVector[] vectors) { } public ColumnarRowIterator mapping( - @Nullable PartitionInfo partitionInfo, @Nullable int[] columnMapping) { - if (partitionInfo != null || columnMapping != null) { + @Nullable PartitionInfo partitionInfo, @Nullable int[] indexMapping) { + if (partitionInfo != null || indexMapping != null) { VectorizedColumnBatch vectorizedColumnBatch = row.batch(); ColumnVector[] vectors = vectorizedColumnBatch.columns; if (partitionInfo != null) { vectors = VectorMappingUtils.createPartitionMappedVectors(partitionInfo, vectors); } - if (columnMapping != null) { - vectors = VectorMappingUtils.createMappedVectors(columnMapping, vectors); + if (indexMapping != null) { + vectors = VectorMappingUtils.createMappedVectors(indexMapping, vectors); } return copy(vectors); } From 3524288440460f2790b33bf2580e89c8c5d3cdfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 10:41:22 +0800 Subject: [PATCH 13/17] fix comment --- .../paimon/utils/BulkFormatMapping.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 6623b20b8e76..b03ac056f161 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -40,7 +40,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; @@ -148,10 +147,16 @@ public BulkFormatMappingBuilder( * *

1. Calculate the readDataFields, which is what we intend to read from the data schema. * Meanwhile, generate the indexCastMapping, which is used to map the index of the - * readDataFields to the index of the data schema. Also, this mapping combined - * trimmedKeyMapping(whith maps the _KEY_xxx fields to xxx fields.) + * readDataFields to the index of the data schema. * - *

2. We want read much fewer fields than readDataFields, so we kick out the partition + *

2. Calculate the mapping to trim _KEY_ fields. For example: we want _KEY_a, _KEY_b, + * _FIELD_SEQUENCE, _ROW_KIND, a, b, c, d, e, f, g from the data, but actually we don't need + * to read _KEY_a and a, _KEY_b and b the same time, so we need to trim them. So we mapping + * it: read before: _KEY_a, _KEY_b, _FIELD_SEQUENCE, _ROW_KIND, a, b, c, d, e, f, g read + * after: a, b, _FIELD_SEQUENCE, _ROW_KIND, c, d, e, f, g and the mapping is + * [0,1,2,3,0,1,4,5,6,7,8], it converts the [read after] columns to [read before] columns. + * + *

3. We want read much fewer fields than readDataFields, so we kick out the partition * fields. We generate the partitionMappingAndFieldsWithoutPartitionPair which helps reduce * the real read fields and tell us how to map it back. */ @@ -205,7 +210,6 @@ static Pair trimKeyFields( fieldMap.put(field.id(), field); } - AtomicInteger index = new AtomicInteger(); for (int i = 0; i < fieldsWithoutPartition.size(); i++) { DataField field = fieldsWithoutPartition.get(i); boolean keyField = SpecialFields.isKeyField(field.name()); @@ -217,8 +221,12 @@ static Pair trimKeyFields( if (positionMap.containsKey(id)) { map[i] = positionMap.get(id); } else { + map[i] = positionMap.computeIfAbsent(id, k -> trimmedFields.size()); + // If the target field is not key field, we remain what it is, because it + // may be projected. Ex: the target field is a row type, but only read the + // few fields in it. If we simply + // trimmedFields.add(f), we will read more fields than we need. trimmedFields.add(keyField ? f : field); - map[i] = positionMap.computeIfAbsent(id, k -> index.getAndIncrement()); } } else { throw new RuntimeException("Can't find field with id: " + id + " in fields."); From 3dc6eb1555a1af3380cb35ffca37b31f1ebe5d2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 10:44:11 +0800 Subject: [PATCH 14/17] fix comment --- .../main/java/org/apache/paimon/utils/BulkFormatMapping.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index b03ac056f161..6a45f5531cb6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -223,7 +223,8 @@ static Pair trimKeyFields( } else { map[i] = positionMap.computeIfAbsent(id, k -> trimmedFields.size()); // If the target field is not key field, we remain what it is, because it - // may be projected. Ex: the target field is a row type, but only read the + // may be projected. Example: the target field is a row type, but only read + // the // few fields in it. If we simply // trimmedFields.add(f), we will read more fields than we need. trimmedFields.add(keyField ? f : field); From c5d6be400f34f37c7e176b0d9dc29ac01886a55a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 10:45:26 +0800 Subject: [PATCH 15/17] fix comment --- .../main/java/org/apache/paimon/utils/BulkFormatMapping.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 6a45f5531cb6..2e33a68ea37c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -224,8 +224,7 @@ static Pair trimKeyFields( map[i] = positionMap.computeIfAbsent(id, k -> trimmedFields.size()); // If the target field is not key field, we remain what it is, because it // may be projected. Example: the target field is a row type, but only read - // the - // few fields in it. If we simply + // the few fields in it. If we simply // trimmedFields.add(f), we will read more fields than we need. trimmedFields.add(keyField ? f : field); } From 9dd2014a11fa16db8dae6c845dcf8aad30c75ed4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 10:45:50 +0800 Subject: [PATCH 16/17] fix comment --- .../main/java/org/apache/paimon/utils/BulkFormatMapping.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 2e33a68ea37c..58ef924df178 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -224,8 +224,8 @@ static Pair trimKeyFields( map[i] = positionMap.computeIfAbsent(id, k -> trimmedFields.size()); // If the target field is not key field, we remain what it is, because it // may be projected. Example: the target field is a row type, but only read - // the few fields in it. If we simply - // trimmedFields.add(f), we will read more fields than we need. + // the few fields in it. If we simply trimmedFields.add(f), we will read + // more fields than we need. trimmedFields.add(keyField ? f : field); } } else { From a6c1b0bc614f219d5810c108bc1f3fd3a3e81adf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 10 Dec 2024 11:39:31 +0800 Subject: [PATCH 17/17] fix comment --- .../paimon/utils/BulkFormatMappingTest.java | 54 ++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java index dc870cd39caa..4d5d6e32e85d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/BulkFormatMappingTest.java @@ -18,6 +18,8 @@ package org.apache.paimon.utils; +import org.apache.paimon.schema.IndexCastMapping; +import org.apache.paimon.schema.SchemaEvolutionUtil; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -34,7 +36,6 @@ public class BulkFormatMappingTest { @Test public void testTrimKeyFields() { - List keyFields = new ArrayList<>(); List allFields = new ArrayList<>(); List testFields = new ArrayList<>(); @@ -92,4 +93,55 @@ public void testTrimKeyFields() { Assertions.assertThat(fields.get(4).id()).isEqualTo(4); Assertions.assertThat(fields.get(5).id()).isEqualTo(6); } + + @Test + public void testTrimKeyWithIndexMapping() { + List readTableFields = new ArrayList<>(); + List readDataFields = new ArrayList<>(); + + readTableFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 1, + SpecialFields.KEY_FIELD_PREFIX + "a", + DataTypes.STRING())); + readTableFields.add(new DataField(0, "0", DataTypes.STRING())); + readTableFields.add(new DataField(1, "a", DataTypes.STRING())); + readTableFields.add(new DataField(2, "2", DataTypes.STRING())); + readTableFields.add(new DataField(3, "3", DataTypes.STRING())); + + readDataFields.add( + new DataField( + SpecialFields.KEY_FIELD_ID_START + 1, + SpecialFields.KEY_FIELD_PREFIX + "a", + DataTypes.STRING())); + readDataFields.add(new DataField(0, "0", DataTypes.STRING())); + readDataFields.add(new DataField(1, "a", DataTypes.STRING())); + readDataFields.add(new DataField(3, "3", DataTypes.STRING())); + + // build index cast mapping + IndexCastMapping indexCastMapping = + SchemaEvolutionUtil.createIndexCastMapping(readTableFields, readDataFields); + + // map from key fields reading to value fields reading + Pair trimmedKeyPair = + BulkFormatMapping.BulkFormatMappingBuilder.trimKeyFields( + readDataFields, readDataFields); + + BulkFormatMapping bulkFormatMapping = + new BulkFormatMapping( + indexCastMapping.getIndexMapping(), + indexCastMapping.getCastMapping(), + trimmedKeyPair.getLeft(), + null, + null, + null, + null); + + Assertions.assertThat(bulkFormatMapping.getIndexMapping()).containsExactly(0, 1, 0, -1, 2); + List trimmed = trimmedKeyPair.getRight().getFields(); + Assertions.assertThat(trimmed.get(0).id()).isEqualTo(1); + Assertions.assertThat(trimmed.get(1).id()).isEqualTo(0); + Assertions.assertThat(trimmed.get(2).id()).isEqualTo(3); + Assertions.assertThat(trimmed.size()).isEqualTo(3); + } }