diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 399ee6c3c42c..3caa72c8dc60 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -286,6 +286,9 @@ private Pair> doPlan( Collection mergedEntries = ManifestEntry.mergeEntries(entries); long skippedByPartitionAndStats = startDataFiles - cntEntries.get(); for (ManifestEntry file : mergedEntries) { + // cache the data schema + schemaManager.schema(file.file().schemaId()); + if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) { String partInfo = partitionType.getFieldCount() > 0 diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index a6d274688aea..0ee9206eccc4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -58,6 +58,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -80,9 +81,12 @@ public class SchemaManager implements Serializable { @Nullable private transient Lock lock; + private final Map cache; + public SchemaManager(FileIO fileIO, Path tableRoot) { this.fileIO = fileIO; this.tableRoot = tableRoot; + this.cache = new ConcurrentHashMap<>(); } public SchemaManager withLock(@Nullable Lock lock) { @@ -363,6 +367,7 @@ public TableSchema commitChanges(List changes) try { boolean success = commit(newSchema); if (success) { + cache.put(newSchema.id(), newSchema); return newSchema; } } catch (Exception e) { @@ -467,6 +472,10 @@ boolean commit(TableSchema newSchema) throws Exception { /** Read schema for schema id. */ public TableSchema schema(long id) { + return cache.computeIfAbsent(id, this::loadSchemaFromFile); + } + + private TableSchema loadSchemaFromFile(Long id) { try { return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class); } catch (IOException e) { @@ -507,6 +516,7 @@ public Path branchSchemaPath(String branchName, long schemaId) { */ public void deleteSchema(long schemaId) { fileIO.deleteQuietly(toSchemaPath(schemaId)); + this.cache.remove(schemaId); } public static void checkAlterTableOption(String key) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index bdc81f3c1d9e..0e608fb42a4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -82,6 +82,8 @@ abstract class AbstractFileStoreTable implements FileStoreTable { protected final TableSchema tableSchema; protected final CatalogEnvironment catalogEnvironment; + protected final SchemaManager tableSchemaManager; + protected AbstractFileStoreTable( FileIO fileIO, Path path, @@ -97,6 +99,7 @@ protected AbstractFileStoreTable( } this.tableSchema = tableSchema; this.catalogEnvironment = catalogEnvironment; + tableSchemaManager = new SchemaManager(fileIO, path); } @Override @@ -259,7 +262,7 @@ public FileStoreTable copyWithLatestSchema() { } protected SchemaManager schemaManager() { - return new SchemaManager(fileIO(), path); + return tableSchemaManager; } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 9c731ffea794..fb4dc7623f49 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -94,6 +94,23 @@ public void afterEach() { } } + @Test + public void testCache() throws Exception { + Path path = new Path(tempDir.toString()); + SchemaManager manager = new SchemaManager(FileIOFinder.find(path), path); + // schema-0 + manager.createTable(schema); + // schema-1 + manager.commitChanges(SchemaChange.setOption("aaa", "bbb")); + // schema-2 + manager.commitChanges(SchemaChange.setOption("ccc", "ddd")); + + SchemaManager newSchemaManager = new SchemaManager(FileIOFinder.find(path), path); + Optional schemaOpt1 = newSchemaManager.latest(); + Optional schemaOpt2 = newSchemaManager.latest(); + assertThat(schemaOpt1.get()).isSameAs(schemaOpt2.get()); + } + @Test public void testCreateTable() throws Exception { TableSchema tableSchema = retryArtificialException(() -> manager.createTable(schema));