From 623c327206631e8bef40c9a6430fc22b22463760 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 15 Mar 2024 16:54:41 +0800 Subject: [PATCH 1/4] [core] Reduce schema read for readers --- .../org/apache/paimon/AbstractFileStore.java | 7 +++--- .../apache/paimon/AppendOnlyFileStore.java | 11 ++++---- .../org/apache/paimon/KeyValueFileStore.java | 14 +++++------ .../paimon/io/KeyValueFileReaderFactory.java | 25 ++++++++++--------- .../operation/AbstractFileStoreScan.java | 6 ++++- .../operation/AppendOnlyFileStoreRead.java | 15 ++++++----- .../operation/AppendOnlyFileStoreScan.java | 6 +++-- .../operation/KeyValueFileStoreRead.java | 6 ++--- .../operation/KeyValueFileStoreScan.java | 10 +++++--- .../operation/KeyValueFileStoreWrite.java | 7 +++--- .../table/AppendOnlyFileStoreTable.java | 2 +- .../table/PrimaryKeyFileStoreTable.java | 2 +- .../java/org/apache/paimon/TestFileStore.java | 8 ++++-- .../paimon/io/KeyValueFileReadWriteTest.java | 2 +- .../paimon/mergetree/ContainsLevelsTest.java | 2 +- .../paimon/mergetree/LookupLevelsTest.java | 2 +- .../paimon/mergetree/MergeTreeTestBase.java | 2 +- .../source/TestChangelogDataReadWrite.java | 6 ++--- 18 files changed, 75 insertions(+), 58 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 6413cd088ede..87cc4e65c544 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -34,6 +34,7 @@ import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.options.MemorySize; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.service.ServiceManager; import org.apache.paimon.stats.StatsFile; import org.apache.paimon.stats.StatsFileHandler; @@ -65,7 +66,7 @@ public abstract class AbstractFileStore implements FileStore { protected final FileIO fileIO; protected final SchemaManager schemaManager; - protected final long schemaId; + protected final TableSchema schema; protected final CoreOptions options; protected final RowType partitionType; private final CatalogEnvironment catalogEnvironment; @@ -75,13 +76,13 @@ public abstract class AbstractFileStore implements FileStore { public AbstractFileStore( FileIO fileIO, SchemaManager schemaManager, - long schemaId, + TableSchema schema, CoreOptions options, RowType partitionType, CatalogEnvironment catalogEnvironment) { this.fileIO = fileIO; this.schemaManager = schemaManager; - this.schemaId = schemaId; + this.schema = schema; this.options = options; this.partitionType = partitionType; this.catalogEnvironment = catalogEnvironment; diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 8be8f817841d..0d546e2154d5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -28,6 +28,7 @@ import org.apache.paimon.operation.ScanBucketFilter; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.types.RowType; @@ -50,14 +51,14 @@ public class AppendOnlyFileStore extends AbstractFileStore { public AppendOnlyFileStore( FileIO fileIO, SchemaManager schemaManager, - long schemaId, + TableSchema schema, CoreOptions options, RowType partitionType, RowType bucketKeyType, RowType rowType, String tableName, CatalogEnvironment catalogEnvironment) { - super(fileIO, schemaManager, schemaId, options, partitionType, catalogEnvironment); + super(fileIO, schemaManager, schema, options, partitionType, catalogEnvironment); this.bucketKeyType = bucketKeyType; this.rowType = rowType; this.tableName = tableName; @@ -82,7 +83,7 @@ public AppendOnlyFileStoreRead newRead() { return new AppendOnlyFileStoreRead( fileIO, schemaManager, - schemaId, + schema, rowType, FileFormatDiscover.of(options), pathFactory()); @@ -99,7 +100,7 @@ public AppendOnlyFileStoreWrite newWrite( return new AppendOnlyFileStoreWrite( fileIO, newRead(), - schemaId, + schema.id(), commitUser, rowType, pathFactory(), @@ -138,7 +139,7 @@ public void pushdown(Predicate predicate) { bucketFilter, snapshotManager(), schemaManager, - schemaId, + schema, manifestFileFactory(forWrite), manifestListFactory(forWrite), options.bucket(), diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index a44c91ba54c5..d80bd6d3979d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -35,6 +35,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.types.RowType; @@ -75,7 +76,7 @@ public class KeyValueFileStore extends AbstractFileStore { public KeyValueFileStore( FileIO fileIO, SchemaManager schemaManager, - long schemaId, + TableSchema schema, boolean crossPartitionUpdate, CoreOptions options, RowType partitionType, @@ -86,7 +87,7 @@ public KeyValueFileStore( MergeFunctionFactory mfFactory, String tableName, CatalogEnvironment catalogEnvironment) { - super(fileIO, schemaManager, schemaId, options, partitionType, catalogEnvironment); + super(fileIO, schemaManager, schema, options, partitionType, catalogEnvironment); this.crossPartitionUpdate = crossPartitionUpdate; this.bucketKeyType = bucketKeyType; this.keyType = keyType; @@ -121,8 +122,7 @@ public KeyValueFileStoreScan newScan(String branchName) { public KeyValueFileStoreRead newRead() { return new KeyValueFileStoreRead( options, - schemaManager, - schemaId, + schema, keyType, valueType, newKeyComparator(), @@ -134,7 +134,7 @@ public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() { return KeyValueFileReaderFactory.builder( fileIO, schemaManager, - schemaId, + schema, keyType, valueType, FileFormatDiscover.of(options), @@ -162,7 +162,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma return new KeyValueFileStoreWrite( fileIO, schemaManager, - schemaId, + schema, commitUser, keyType, valueType, @@ -221,7 +221,7 @@ public void pushdown(Predicate keyFilter) { bucketFilter, snapshotManager(), schemaManager, - schemaId, + schema, keyValueFieldsExtractor, manifestFileFactory(forWrite), manifestListFactory(forWrite), diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 63fef31fc142..cc7534e9aed6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -31,6 +31,7 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.AsyncRecordReader; import org.apache.paimon.utils.BulkFormatMapping; @@ -52,7 +53,7 @@ public class KeyValueFileReaderFactory { private final FileIO fileIO; private final SchemaManager schemaManager; - private final long schemaId; + private final TableSchema schema; private final RowType keyType; private final RowType valueType; @@ -67,7 +68,7 @@ public class KeyValueFileReaderFactory { private KeyValueFileReaderFactory( FileIO fileIO, SchemaManager schemaManager, - long schemaId, + TableSchema schema, RowType keyType, RowType valueType, BulkFormatMapping.BulkFormatMappingBuilder bulkFormatMappingBuilder, @@ -77,7 +78,7 @@ private KeyValueFileReaderFactory( DeletionVector.Factory dvFactory) { this.fileIO = fileIO; this.schemaManager = schemaManager; - this.schemaId = schemaId; + this.schema = schema; this.keyType = keyType; this.valueType = valueType; this.bulkFormatMappingBuilder = bulkFormatMappingBuilder; @@ -110,8 +111,8 @@ private RecordReader createRecordReader( () -> bulkFormatMappingBuilder.build( formatIdentifier, - schemaManager.schema(this.schemaId), - schemaManager.schema(schemaId)); + schema, + schemaId == schema.id() ? schema : schemaManager.schema(schemaId)); BulkFormatMapping bulkFormatMapping = reuseFormat @@ -141,7 +142,7 @@ private RecordReader createRecordReader( public static Builder builder( FileIO fileIO, SchemaManager schemaManager, - long schemaId, + TableSchema schema, RowType keyType, RowType valueType, FileFormatDiscover formatDiscover, @@ -151,7 +152,7 @@ public static Builder builder( return new Builder( fileIO, schemaManager, - schemaId, + schema, keyType, valueType, formatDiscover, @@ -165,7 +166,7 @@ public static class Builder { private final FileIO fileIO; private final SchemaManager schemaManager; - private final long schemaId; + private final TableSchema schema; private final RowType keyType; private final RowType valueType; private final FileFormatDiscover formatDiscover; @@ -182,7 +183,7 @@ public static class Builder { private Builder( FileIO fileIO, SchemaManager schemaManager, - long schemaId, + TableSchema schema, RowType keyType, RowType valueType, FileFormatDiscover formatDiscover, @@ -191,7 +192,7 @@ private Builder( CoreOptions options) { this.fileIO = fileIO; this.schemaManager = schemaManager; - this.schemaId = schemaId; + this.schema = schema; this.keyType = keyType; this.valueType = valueType; this.formatDiscover = formatDiscover; @@ -209,7 +210,7 @@ public Builder copyWithoutProjection() { return new Builder( fileIO, schemaManager, - schemaId, + schema, keyType, valueType, formatDiscover, @@ -255,7 +256,7 @@ public KeyValueFileReaderFactory build( return new KeyValueFileReaderFactory( fileIO, schemaManager, - schemaId, + schema, projectedKeyType, projectedValueType, BulkFormatMapping.newBuilder( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index e6c95e8854b5..52983f4b6e2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -72,6 +72,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private final ConcurrentMap tableSchemas; private final SchemaManager schemaManager; + private final TableSchema schema; protected final ScanBucketFilter bucketKeyFilter; private final String branchName; @@ -91,6 +92,7 @@ public AbstractFileStoreScan( ScanBucketFilter bucketKeyFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, + TableSchema schema, ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, int numOfBuckets, @@ -101,6 +103,7 @@ public AbstractFileStoreScan( this.bucketKeyFilter = bucketKeyFilter; this.snapshotManager = snapshotManager; this.schemaManager = schemaManager; + this.schema = schema; this.manifestFileFactory = manifestFileFactory; this.manifestList = manifestListFactory.create(); this.numOfBuckets = numOfBuckets; @@ -407,7 +410,8 @@ private List readManifests(Snapshot snapshot) { /** Note: Keep this thread-safe. */ protected TableSchema scanTableSchema(long id) { - return tableSchemas.computeIfAbsent(id, key -> schemaManager.schema(id)); + return tableSchemas.computeIfAbsent( + id, key -> key == schema.id() ? schema : schemaManager.schema(id)); } /** Note: Keep this thread-safe. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java index c06cce45899a..8363c297ad7b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java @@ -62,7 +62,7 @@ public class AppendOnlyFileStoreRead implements FileStoreRead { private final FileIO fileIO; private final SchemaManager schemaManager; - private final long schemaId; + private final TableSchema schema; private final FileFormatDiscover formatDiscover; private final FileStorePathFactory pathFactory; private final Map bulkFormatMappings; @@ -74,13 +74,13 @@ public class AppendOnlyFileStoreRead implements FileStoreRead { public AppendOnlyFileStoreRead( FileIO fileIO, SchemaManager schemaManager, - long schemaId, + TableSchema schema, RowType rowType, FileFormatDiscover formatDiscover, FileStorePathFactory pathFactory) { this.fileIO = fileIO; this.schemaManager = schemaManager; - this.schemaId = schemaId; + this.schema = schema; this.formatDiscover = formatDiscover; this.pathFactory = pathFactory; this.bulkFormatMappings = new HashMap<>(); @@ -113,8 +113,11 @@ public RecordReader createReader(DataSplit split) throws IOExceptio bulkFormatMappings.computeIfAbsent( new FormatKey(file.schemaId(), formatIdentifier), key -> { - TableSchema tableSchema = schemaManager.schema(this.schemaId); - TableSchema dataSchema = schemaManager.schema(key.schemaId); + TableSchema tableSchema = schema; + TableSchema dataSchema = + key.schemaId == schema.id() + ? schema + : schemaManager.schema(key.schemaId); // projection to data schema int[][] dataProjection = @@ -131,7 +134,7 @@ public RecordReader createReader(DataSplit split) throws IOExceptio dataSchema.fields()); List dataFilters = - this.schemaId == key.schemaId + this.schema.id() == key.schemaId ? filters : SchemaEvolutionUtil.createDataFilters( tableSchema.fields(), diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 2cec4e064975..866c87d75f66 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -24,6 +24,7 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.stats.FieldStatsConverters; @@ -44,7 +45,7 @@ public AppendOnlyFileStoreScan( ScanBucketFilter bucketFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, - long schemaId, + TableSchema schema, ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, int numOfBuckets, @@ -56,6 +57,7 @@ public AppendOnlyFileStoreScan( bucketFilter, snapshotManager, schemaManager, + schema, manifestFileFactory, manifestListFactory, numOfBuckets, @@ -63,7 +65,7 @@ public AppendOnlyFileStoreScan( scanManifestParallelism, branchName); this.fieldStatsConverters = - new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schemaId); + new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id()); } public AppendOnlyFileStoreScan withFilter(Predicate predicate) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index db4c0ac926ca..0e115fdddc87 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -41,7 +41,6 @@ import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DeletionFile; @@ -89,14 +88,13 @@ public class KeyValueFileStoreRead implements FileStoreRead { public KeyValueFileStoreRead( CoreOptions options, - SchemaManager schemaManager, - long schemaId, + TableSchema schema, RowType keyType, RowType valueType, Comparator keyComparator, MergeFunctionFactory mfFactory, KeyValueFileReaderFactory.Builder readerFactoryBuilder) { - this.tableSchema = schemaManager.schema(schemaId); + this.tableSchema = schema; this.readerFactoryBuilder = readerFactoryBuilder; this.fileIO = readerFactoryBuilder.fileIO(); this.keyComparator = keyComparator; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 8e37d1e4fac1..0f34cac5a138 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -25,6 +25,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.stats.FieldStatsConverters; @@ -50,7 +51,7 @@ public KeyValueFileStoreScan( ScanBucketFilter bucketFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, - long schemaId, + TableSchema schema, KeyValueFieldsExtractor keyValueFieldsExtractor, ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, @@ -64,6 +65,7 @@ public KeyValueFileStoreScan( bucketFilter, snapshotManager, schemaManager, + schema, manifestFileFactory, manifestListFactory, numOfBuckets, @@ -72,10 +74,12 @@ public KeyValueFileStoreScan( branchName); this.fieldKeyStatsConverters = new FieldStatsConverters( - sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId); + sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), + schema.id()); this.fieldValueStatsConverters = new FieldStatsConverters( - sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), schemaId); + sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), + schema.id()); this.deletionVectorsEnabled = deletionVectorsEnabled; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 41b77d612885..e06ed9ea26b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -60,6 +60,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FieldsComparator; @@ -102,7 +103,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite { public KeyValueFileStoreWrite( FileIO fileIO, SchemaManager schemaManager, - long schemaId, + TableSchema schema, String commitUser, RowType keyType, RowType valueType, @@ -135,7 +136,7 @@ public KeyValueFileStoreWrite( KeyValueFileReaderFactory.builder( fileIO, schemaManager, - schemaId, + schema, keyType, valueType, FileFormatDiscover.of(options), @@ -145,7 +146,7 @@ public KeyValueFileStoreWrite( this.writerFactoryBuilder = KeyValueFileWriterFactory.builder( fileIO, - schemaId, + schema.id(), keyType, valueType, options.fileFormat(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 9c97d406cb7e..4d91328f24c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -77,7 +77,7 @@ public AppendOnlyFileStore store() { new AppendOnlyFileStore( fileIO, schemaManager(), - tableSchema.id(), + tableSchema, new CoreOptions(tableSchema.options()), tableSchema.logicalPartitionType(), tableSchema.logicalBucketKeyType(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 7b30fb832ef0..f35afc64d5c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -98,7 +98,7 @@ public KeyValueFileStore store() { new KeyValueFileStore( fileIO(), schemaManager(), - tableSchema.id(), + tableSchema, tableSchema.crossPartitionUpdate(), options, tableSchema.logicalPartitionType(), diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index d9cc7f87866a..c39cc6a5f4b8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -106,8 +106,8 @@ private TestFileStore( MergeFunctionFactory mfFactory) { super( FileIOFinder.find(new Path(root)), - new SchemaManager(FileIOFinder.find(new Path(root)), options.path()), - 0L, + schemaManager(root, options), + schemaManager(root, options).schema(0), false, options, partitionType, @@ -127,6 +127,10 @@ private TestFileStore( this.commitIdentifier = 0L; } + private static SchemaManager schemaManager(String root, CoreOptions options) { + return new SchemaManager(FileIOFinder.find(new Path(root)), options.path()); + } + public AbstractFileStoreWrite newWrite() { return super.newWrite(commitUser); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 05f260097a4d..ca7f75d6eb57 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -282,7 +282,7 @@ private KeyValueFileReaderFactory createReaderFactory( KeyValueFileReaderFactory.builder( fileIO, createTestSchemaManager(path), - 0, + createTestSchemaManager(path).schema(0), KEY_TYPE, DEFAULT_ROW_TYPE, ignore -> new FlushingFileFormat(format), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 58d9dbe904d9..d53b94155229 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -235,7 +235,7 @@ private KeyValueFileReaderFactory createReaderFactory() { KeyValueFileReaderFactory.builder( FileIOFinder.find(path), createSchemaManager(path), - 0, + createSchemaManager(path).schema(0), keyType, rowType, ignore -> new FlushingFileFormat("avro"), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 00d8eeb5a8e4..f4d27784a56d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -315,7 +315,7 @@ private KeyValueFileReaderFactory createReaderFactory() { KeyValueFileReaderFactory.builder( FileIOFinder.find(path), createSchemaManager(path), - 0, + createSchemaManager(path).schema(0), keyType, rowType, ignore -> new FlushingFileFormat("avro"), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index f6ddf74ea699..a3bcc9bfa32c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -147,7 +147,7 @@ private void recreateMergeTree(long targetFileSize) { KeyValueFileReaderFactory.builder( LocalFileIO.create(), createTestingSchemaManager(path), - 0, + createTestingSchemaManager(path).schema(0), keyType, valueType, ignore -> flushingAvro, diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 7ffa94809454..7bc89d4d4d66 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -119,13 +119,11 @@ private TableRead createRead( RecordReader.RecordIterator> rowDataIteratorCreator) { SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); - long schemaId = 0; CoreOptions options = new CoreOptions(new HashMap<>()); KeyValueFileStoreRead read = new KeyValueFileStoreRead( options, - schemaManager, - schemaId, + schemaManager.schema(0), KEY_TYPE, VALUE_TYPE, COMPARATOR, @@ -133,7 +131,7 @@ private TableRead createRead( KeyValueFileReaderFactory.builder( LocalFileIO.create(), schemaManager, - schemaId, + 0, KEY_TYPE, VALUE_TYPE, ignore -> avro, From 672ce651d8cf2c170b76a370e251628526d3ffc5 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 15 Mar 2024 17:02:21 +0800 Subject: [PATCH 2/4] fix --- .../paimon/flink/source/TestChangelogDataReadWrite.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 7bc89d4d4d66..d51aefb962c5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -131,7 +131,7 @@ private TableRead createRead( KeyValueFileReaderFactory.builder( LocalFileIO.create(), schemaManager, - 0, + schemaManager.schema(0), KEY_TYPE, VALUE_TYPE, ignore -> avro, @@ -175,11 +175,12 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc Map pathFactoryMap = new HashMap<>(); pathFactoryMap.put("avro", pathFactory); + SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); RecordWriter writer = new KeyValueFileStoreWrite( LocalFileIO.create(), - new SchemaManager(LocalFileIO.create(), tablePath), - 0, + schemaManager, + schemaManager.schema(0), commitUser, KEY_TYPE, VALUE_TYPE, From c79c0c7057f94de55426b7dcc6bfa028158b1784 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 15 Mar 2024 17:21:36 +0800 Subject: [PATCH 3/4] fix --- .../src/test/java/org/apache/paimon/TestFileStore.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index c39cc6a5f4b8..29e8b57a2030 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -44,6 +44,7 @@ import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.ExpireSnapshotsImpl; @@ -107,7 +108,14 @@ private TestFileStore( super( FileIOFinder.find(new Path(root)), schemaManager(root, options), - schemaManager(root, options).schema(0), + new TableSchema( + 0L, + valueType.getFields(), + valueType.getFieldCount(), + partitionType.getFieldNames(), + keyType.getFieldNames(), + Collections.emptyMap(), + null), false, options, partitionType, From a4a19ebfca52d0a27286c46d39dca8cb89330f3b Mon Sep 17 00:00:00 2001 From: Jingsong Date: Fri, 15 Mar 2024 18:00:26 +0800 Subject: [PATCH 4/4] fix --- .../java/org/apache/paimon/TestFileStore.java | 29 ++++++++++++------- .../paimon/operation/FileDeletionTest.java | 19 +++++++----- .../paimon/operation/FileStoreCommitTest.java | 23 ++++++++------- .../operation/FileStoreExpireTestBase.java | 3 +- .../operation/KeyValueFileStoreReadTest.java | 8 +++-- .../operation/KeyValueFileStoreScanTest.java | 3 +- 6 files changed, 51 insertions(+), 34 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 29e8b57a2030..43e0297fd921 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -104,18 +104,21 @@ private TestFileStore( RowType keyType, RowType valueType, KeyValueFieldsExtractor keyValueFieldsExtractor, - MergeFunctionFactory mfFactory) { + MergeFunctionFactory mfFactory, + TableSchema tableSchema) { super( FileIOFinder.find(new Path(root)), schemaManager(root, options), - new TableSchema( - 0L, - valueType.getFields(), - valueType.getFieldCount(), - partitionType.getFieldNames(), - keyType.getFieldNames(), - Collections.emptyMap(), - null), + tableSchema != null + ? tableSchema + : new TableSchema( + 0L, + valueType.getFields(), + valueType.getFieldCount(), + partitionType.getFieldNames(), + keyType.getFieldNames(), + Collections.emptyMap(), + null), false, options, partitionType, @@ -575,6 +578,7 @@ public static class Builder { private final RowType valueType; private final KeyValueFieldsExtractor keyValueFieldsExtractor; private final MergeFunctionFactory mfFactory; + private final TableSchema tableSchema; private CoreOptions.ChangelogProducer changelogProducer; @@ -586,7 +590,8 @@ public Builder( RowType keyType, RowType valueType, KeyValueFieldsExtractor keyValueFieldsExtractor, - MergeFunctionFactory mfFactory) { + MergeFunctionFactory mfFactory, + TableSchema tableSchema) { this.format = format; this.root = root; this.numBuckets = numBuckets; @@ -595,6 +600,7 @@ public Builder( this.valueType = valueType; this.keyValueFieldsExtractor = keyValueFieldsExtractor; this.mfFactory = mfFactory; + this.tableSchema = tableSchema; this.changelogProducer = CoreOptions.ChangelogProducer.NONE; } @@ -632,7 +638,8 @@ public TestFileStore build() { keyType, valueType, keyValueFieldsExtractor, - mfFactory); + mfFactory, + tableSchema); } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 49e55ee4e1f6..9994c0809a88 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -36,6 +36,7 @@ import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.RecordWriter; @@ -664,13 +665,14 @@ private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode mode, int } SchemaManager schemaManager = new SchemaManager(fileIO, new Path(root)); - schemaManager.createTable( - new Schema( - rowType.getFields(), - partitionType.getFieldNames(), - TestKeyValueGenerator.getPrimaryKeys(mode), - Collections.emptyMap(), - null)); + TableSchema tableSchema = + schemaManager.createTable( + new Schema( + rowType.getFields(), + partitionType.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys(mode), + Collections.emptyMap(), + null)); return new TestFileStore.Builder( "avro", @@ -680,7 +682,8 @@ private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode mode, int TestKeyValueGenerator.KEY_TYPE, rowType, TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, - DeduplicateMergeFunction.factory()) + DeduplicateMergeFunction.factory(), + tableSchema) .changelogProducer(changelogProducer) .build(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 488f74f67e64..b0cec3f445da 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -35,6 +35,7 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.ColStats; import org.apache.paimon.stats.Statistics; import org.apache.paimon.stats.StatsFileHandler; @@ -856,15 +857,16 @@ private TestFileStore createStore( ? FailingFileIO.getFailingPath(failingName, tempDir.toString()) : TraceableFileIO.SCHEME + "://" + tempDir.toString(); Path path = new Path(tempDir.toUri()); - SchemaUtils.forceCommit( - new SchemaManager(new LocalFileIO(), path), - new Schema( - TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), - TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), - TestKeyValueGenerator.getPrimaryKeys( - TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), - Collections.emptyMap(), - null)); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(new LocalFileIO(), path), + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + Collections.emptyMap(), + null)); return new TestFileStore.Builder( "avro", root, @@ -873,7 +875,8 @@ private TestFileStore createStore( TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.DEFAULT_ROW_TYPE, TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, - DeduplicateMergeFunction.factory()) + DeduplicateMergeFunction.factory(), + tableSchema) .changelogProducer(changelogProducer) .build(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java index d9b3469f9a16..5ebe76ed608d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java @@ -86,7 +86,8 @@ private TestFileStore createStore() { TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.DEFAULT_ROW_TYPE, TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, - DeduplicateMergeFunction.factory()) + DeduplicateMergeFunction.factory(), + null) .changelogProducer(changelogProducer) .build(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java index a3c0142e70a0..07bf705e2bba 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreReadTest.java @@ -268,7 +268,7 @@ private TestFileStore createStore( Path path = new Path(tempDir.toUri()); SchemaManager schemaManager = new SchemaManager(FileIOFinder.find(path), path); boolean valueCountMode = mfFactory.create() instanceof TestValueCountMergeFunction; - schemaManager.createTable( + Schema schema = new Schema( (valueCountMode ? keyType : valueType).getFields(), partitionType.getFieldNames(), @@ -280,7 +280,8 @@ private TestFileStore createStore( partitionType.getFieldNames().stream()) .collect(Collectors.toList()), Collections.emptyMap(), - null)); + null); + TableSchema tableSchema = schemaManager.createTable(schema); return new TestFileStore.Builder( "avro", tempDir.toString(), @@ -289,7 +290,8 @@ private TestFileStore createStore( keyType, valueType, extractor, - mfFactory) + mfFactory, + tableSchema) .build(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 74dee018ef9c..7f1fc0d3c90e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -76,7 +76,8 @@ public void beforeEach() throws Exception { TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.DEFAULT_ROW_TYPE, TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, - DeduplicateMergeFunction.factory()) + DeduplicateMergeFunction.factory(), + null) .build(); snapshotManager = store.snapshotManager();