From 6abb6424be24b03bf92a5038e176cc2696a16152 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Sun, 3 Mar 2024 14:16:48 +0800 Subject: [PATCH 1/9] add schema cache --- .../apache/paimon/schema/SchemaManager.java | 25 +++++++++++++- .../paimon/table/AbstractFileStoreTable.java | 19 ++++++++++- .../paimon/schema/SchemaManagerTest.java | 33 +++++++++++++++++++ 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index a6d274688aea..0c7439db970f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -45,6 +45,9 @@ import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -80,9 +83,17 @@ public class SchemaManager implements Serializable { @Nullable private transient Lock lock; + private transient Cache cache; + public SchemaManager(FileIO fileIO, Path tableRoot) { + this(fileIO, tableRoot, new HashMap<>()); + } + + public SchemaManager(FileIO fileIO, Path tableRoot, Map cache) { this.fileIO = fileIO; this.tableRoot = tableRoot; + this.cache = Caffeine.newBuilder().maximumSize(10).build(); + this.cache.putAll(cache); } public SchemaManager withLock(@Nullable Lock lock) { @@ -124,6 +135,11 @@ public List listAllIds() { } } + @VisibleForTesting + Cache getCachedSchema() { + return cache; + } + /** Create a new schema from {@link Schema}. */ public TableSchema createTable(Schema schema) throws Exception { while (true) { @@ -467,8 +483,14 @@ boolean commit(TableSchema newSchema) throws Exception { /** Read schema for schema id. */ public TableSchema schema(long id) { + return this.cache.get(id, this::loadSchemaFromFile); + } + + private TableSchema loadSchemaFromFile(Long id) { try { - return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class); + Path schemaPath = toSchemaPath(id); + String schemaJson = fileIO.readFileUtf8(schemaPath); + return JsonSerdeUtil.fromJson(schemaJson, TableSchema.class); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -507,6 +529,7 @@ public Path branchSchemaPath(String branchName, long schemaId) { */ public void deleteSchema(long schemaId) { fileIO.deleteQuietly(toSchemaPath(schemaId)); + this.cache.invalidate(schemaId); } public static void checkAlterTableOption(String key) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index bdc81f3c1d9e..c378de192a4c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -55,12 +55,15 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.jetbrains.annotations.NotNull; + import javax.annotation.Nullable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -82,6 +85,8 @@ abstract class AbstractFileStoreTable implements FileStoreTable { protected final TableSchema tableSchema; protected final CatalogEnvironment catalogEnvironment; + protected final Map schemaCache; + protected AbstractFileStoreTable( FileIO fileIO, Path path, @@ -97,6 +102,18 @@ protected AbstractFileStoreTable( } this.tableSchema = tableSchema; this.catalogEnvironment = catalogEnvironment; + schemaCache = loadSchemaCache(fileIO, path); + } + + @NotNull + private Map loadSchemaCache(FileIO fileIO, Path path) { + final Map schemaCache; + schemaCache = new LinkedHashMap<>(); + SchemaManager schemaManager = new SchemaManager(fileIO, path); + for (TableSchema schema : schemaManager.listAll()) { + schemaCache.put(schema.id(), schema); + } + return schemaCache; } @Override @@ -259,7 +276,7 @@ public FileStoreTable copyWithLatestSchema() { } protected SchemaManager schemaManager() { - return new SchemaManager(fileIO(), path); + return new SchemaManager(fileIO(), path, schemaCache); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 9c731ffea794..9df2e07aece8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -30,6 +30,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.InstantiationUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -94,6 +95,38 @@ public void afterEach() { } } + @Test + public void testCache() throws Exception { + + // schema-0 + manager.createTable(schema); + + // schema-1 + manager.commitChanges(SchemaChange.setOption("aaa", "bbb")); + + // schema-2 + manager.commitChanges(SchemaChange.setOption("ccc", "ddd")); + + Optional latest = manager.latest(); + assertThat(latest).isPresent(); + assertThat(latest.get().options()).containsKey("ccc"); + assertThat(latest.get().options()).containsKey("aaa"); + + assertThat(manager.getCachedSchema().asMap()).hasSize(3); + + manager.deleteSchema(1L); + Map cachedSchema = manager.getCachedSchema().asMap(); + assertThat(cachedSchema).hasSize(2); + TableSchema tableSchema = cachedSchema.get(2L); + assertThat(tableSchema.options()).containsKey("ccc"); + + byte[] bytes = InstantiationUtil.serializeObject(manager); + SchemaManager schemaManager = + InstantiationUtil.deserializeObject( + bytes, Thread.currentThread().getContextClassLoader()); + assertThat(schemaManager.getCachedSchema().asMap()).isEqualTo(cachedSchema); + } + @Test public void testCreateTable() throws Exception { TableSchema tableSchema = retryArtificialException(() -> manager.createTable(schema)); From a013cadb5335d4357c86b37467ad858f7d50631e Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Mon, 4 Mar 2024 21:18:56 +0800 Subject: [PATCH 2/9] fix test --- .../apache/paimon/schema/SchemaManager.java | 1 + .../paimon/table/AbstractFileStoreTable.java | 2 -- .../paimon/schema/SchemaManagerTest.java | 25 ++++++------------- .../paimon/table/FileStoreTableTestBase.java | 15 +++++++++++ 4 files changed, 24 insertions(+), 19 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 0c7439db970f..c665c7930fd7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -379,6 +379,7 @@ public TableSchema commitChanges(List changes) try { boolean success = commit(newSchema); if (success) { + cache.put(newSchema.id(),newSchema); return newSchema; } } catch (Exception e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index c378de192a4c..15ac397fcfa1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -54,11 +54,9 @@ import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; - import org.jetbrains.annotations.NotNull; import javax.annotation.Nullable; - import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 9df2e07aece8..944b13b11ea7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DoubleType; @@ -97,34 +98,24 @@ public void afterEach() { @Test public void testCache() throws Exception { + Path path = new Path(tempDir.toString()); + SchemaManager manager = new SchemaManager(FileIOFinder.find(path), path); // schema-0 manager.createTable(schema); - // schema-1 manager.commitChanges(SchemaChange.setOption("aaa", "bbb")); - // schema-2 manager.commitChanges(SchemaChange.setOption("ccc", "ddd")); - Optional latest = manager.latest(); - assertThat(latest).isPresent(); - assertThat(latest.get().options()).containsKey("ccc"); - assertThat(latest.get().options()).containsKey("aaa"); - - assertThat(manager.getCachedSchema().asMap()).hasSize(3); + Cache cachedSchema = manager.getCachedSchema(); + assertThat(cachedSchema.asMap()).hasSize(3); manager.deleteSchema(1L); - Map cachedSchema = manager.getCachedSchema().asMap(); - assertThat(cachedSchema).hasSize(2); - TableSchema tableSchema = cachedSchema.get(2L); + Map map = cachedSchema.asMap(); + assertThat(map).hasSize(2); + TableSchema tableSchema = map.get(2L); assertThat(tableSchema.options()).containsKey("ccc"); - - byte[] bytes = InstantiationUtil.serializeObject(manager); - SchemaManager schemaManager = - InstantiationUtil.deserializeObject( - bytes, Thread.currentThread().getContextClassLoader()); - assertThat(schemaManager.getCachedSchema().asMap()).isEqualTo(cachedSchema); } @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 4cd019568c41..b37e6a60188a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -64,6 +64,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.InstantiationUtil; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; import org.apache.paimon.utils.TraceableFileIO; @@ -969,6 +970,8 @@ public void testCreateBranch() throws Exception { assertThat(branchSchema.equals(schema0)).isTrue(); } + + @Test public void testUnsupportedBranchName() throws Exception { FileStoreTable table = createFileStoreTable(); @@ -1506,4 +1509,16 @@ protected void generateBranch(FileStoreTable table) throws Exception { BATCH_ROW_TO_STRING)) .containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset"); } + + @Test + public void testSerAndDer() throws Exception { + FileStoreTable originFileStoreTable = createFileStoreTable(); + + byte[] bytes = InstantiationUtil.serializeObject(originFileStoreTable); + AbstractFileStoreTable serializedFileStoreTable = + InstantiationUtil.deserializeObject( + bytes, Thread.currentThread().getContextClassLoader()); + assertThat(serializedFileStoreTable.schemaCache.size()).isEqualTo(1); + assertThat(serializedFileStoreTable.schemaCache).containsOnlyKeys(0L); + } } From 57aa21325e1cd594c6a2e774a0de76464e877322 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Mon, 4 Mar 2024 22:41:30 +0800 Subject: [PATCH 3/9] fix test --- .../src/main/java/org/apache/paimon/schema/SchemaManager.java | 2 +- .../java/org/apache/paimon/table/AbstractFileStoreTable.java | 2 ++ .../test/java/org/apache/paimon/schema/SchemaManagerTest.java | 4 ++-- .../java/org/apache/paimon/table/FileStoreTableTestBase.java | 2 -- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index c665c7930fd7..67b973c6c60a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -379,7 +379,7 @@ public TableSchema commitChanges(List changes) try { boolean success = commit(newSchema); if (success) { - cache.put(newSchema.id(),newSchema); + cache.put(newSchema.id(), newSchema); return newSchema; } } catch (Exception e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 15ac397fcfa1..c378de192a4c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -54,9 +54,11 @@ import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; + import org.jetbrains.annotations.NotNull; import javax.annotation.Nullable; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 944b13b11ea7..cf95e60742a1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -22,7 +22,6 @@ import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DoubleType; @@ -31,7 +30,8 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.FailingFileIO; -import org.apache.paimon.utils.InstantiationUtil; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index b37e6a60188a..7c506a25d0cc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -970,8 +970,6 @@ public void testCreateBranch() throws Exception { assertThat(branchSchema.equals(schema0)).isTrue(); } - - @Test public void testUnsupportedBranchName() throws Exception { FileStoreTable table = createFileStoreTable(); From 0bad09cb560805a7b4f8792e2fb13d7084c08b79 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Tue, 5 Mar 2024 11:37:53 +0800 Subject: [PATCH 4/9] fix test --- .../apache/paimon/schema/SchemaManager.java | 25 ++++++++++--------- .../paimon/table/AbstractFileStoreTable.java | 5 ++-- .../paimon/schema/SchemaManagerTest.java | 11 +++----- 3 files changed, 19 insertions(+), 22 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 67b973c6c60a..712cce75d6fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -45,9 +45,6 @@ import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; - import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -61,6 +58,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -81,19 +79,18 @@ public class SchemaManager implements Serializable { private final FileIO fileIO; private final Path tableRoot; - @Nullable private transient Lock lock; + private transient Lock lock; - private transient Cache cache; + private final Map cache; public SchemaManager(FileIO fileIO, Path tableRoot) { - this(fileIO, tableRoot, new HashMap<>()); + this(fileIO, tableRoot, new ConcurrentHashMap<>()); } public SchemaManager(FileIO fileIO, Path tableRoot, Map cache) { this.fileIO = fileIO; this.tableRoot = tableRoot; - this.cache = Caffeine.newBuilder().maximumSize(10).build(); - this.cache.putAll(cache); + this.cache = cache; } public SchemaManager withLock(@Nullable Lock lock) { @@ -136,7 +133,7 @@ public List listAllIds() { } @VisibleForTesting - Cache getCachedSchema() { + Map getCachedSchema() { return cache; } @@ -379,7 +376,9 @@ public TableSchema commitChanges(List changes) try { boolean success = commit(newSchema); if (success) { - cache.put(newSchema.id(), newSchema); + if (cache != null) { + cache.put(newSchema.id(), newSchema); + } return newSchema; } } catch (Exception e) { @@ -484,7 +483,7 @@ boolean commit(TableSchema newSchema) throws Exception { /** Read schema for schema id. */ public TableSchema schema(long id) { - return this.cache.get(id, this::loadSchemaFromFile); + return cache.computeIfAbsent(id, this::loadSchemaFromFile); } private TableSchema loadSchemaFromFile(Long id) { @@ -530,7 +529,9 @@ public Path branchSchemaPath(String branchName, long schemaId) { */ public void deleteSchema(long schemaId) { fileIO.deleteQuietly(toSchemaPath(schemaId)); - this.cache.invalidate(schemaId); + if (cache != null) { + this.cache.remove(schemaId); + } } public static void checkAlterTableOption(String key) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index c378de192a4c..9b384b5bf05c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -63,11 +63,11 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import static org.apache.paimon.CoreOptions.PATH; @@ -107,8 +107,7 @@ protected AbstractFileStoreTable( @NotNull private Map loadSchemaCache(FileIO fileIO, Path path) { - final Map schemaCache; - schemaCache = new LinkedHashMap<>(); + Map schemaCache = new ConcurrentHashMap<>(); SchemaManager schemaManager = new SchemaManager(fileIO, path); for (TableSchema schema : schemaManager.listAll()) { schemaCache.put(schema.id(), schema); diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index cf95e60742a1..4034e39f3219 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -31,8 +31,6 @@ import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.FailingFileIO; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -108,13 +106,12 @@ public void testCache() throws Exception { // schema-2 manager.commitChanges(SchemaChange.setOption("ccc", "ddd")); - Cache cachedSchema = manager.getCachedSchema(); - assertThat(cachedSchema.asMap()).hasSize(3); + Map cachedSchema = manager.getCachedSchema(); + assertThat(cachedSchema).hasSize(3); manager.deleteSchema(1L); - Map map = cachedSchema.asMap(); - assertThat(map).hasSize(2); - TableSchema tableSchema = map.get(2L); + assertThat(cachedSchema).hasSize(2); + TableSchema tableSchema = cachedSchema.get(2L); assertThat(tableSchema.options()).containsKey("ccc"); } From 96082634deba0b084a1e118860d54f744e25c641 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Sun, 10 Mar 2024 12:31:43 +0800 Subject: [PATCH 5/9] refactor --- .../apache/paimon/schema/SchemaManager.java | 16 ++++++++-------- .../paimon/table/AbstractFileStoreTable.java | 18 +++--------------- .../paimon/schema/SchemaManagerTest.java | 5 +++-- .../paimon/table/FileStoreTableTestBase.java | 11 +++++++---- 4 files changed, 21 insertions(+), 29 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 712cce75d6fa..04106822e696 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -58,7 +58,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -79,12 +78,12 @@ public class SchemaManager implements Serializable { private final FileIO fileIO; private final Path tableRoot; - private transient Lock lock; + @Nullable private transient Lock lock; private final Map cache; public SchemaManager(FileIO fileIO, Path tableRoot) { - this(fileIO, tableRoot, new ConcurrentHashMap<>()); + this(fileIO, tableRoot, null); } public SchemaManager(FileIO fileIO, Path tableRoot, Map cache) { @@ -133,7 +132,7 @@ public List listAllIds() { } @VisibleForTesting - Map getCachedSchema() { + public Map getCachedSchema() { return cache; } @@ -483,14 +482,15 @@ boolean commit(TableSchema newSchema) throws Exception { /** Read schema for schema id. */ public TableSchema schema(long id) { - return cache.computeIfAbsent(id, this::loadSchemaFromFile); + if (cache != null) { + return cache.computeIfAbsent(id, this::loadSchemaFromFile); + } + return loadSchemaFromFile(id); } private TableSchema loadSchemaFromFile(Long id) { try { - Path schemaPath = toSchemaPath(id); - String schemaJson = fileIO.readFileUtf8(schemaPath); - return JsonSerdeUtil.fromJson(schemaJson, TableSchema.class); + return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 9b384b5bf05c..568988ac6082 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -55,8 +55,6 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; -import org.jetbrains.annotations.NotNull; - import javax.annotation.Nullable; import java.io.IOException; @@ -85,7 +83,7 @@ abstract class AbstractFileStoreTable implements FileStoreTable { protected final TableSchema tableSchema; protected final CatalogEnvironment catalogEnvironment; - protected final Map schemaCache; + protected final SchemaManager tableSchemaManager; protected AbstractFileStoreTable( FileIO fileIO, @@ -102,17 +100,7 @@ protected AbstractFileStoreTable( } this.tableSchema = tableSchema; this.catalogEnvironment = catalogEnvironment; - schemaCache = loadSchemaCache(fileIO, path); - } - - @NotNull - private Map loadSchemaCache(FileIO fileIO, Path path) { - Map schemaCache = new ConcurrentHashMap<>(); - SchemaManager schemaManager = new SchemaManager(fileIO, path); - for (TableSchema schema : schemaManager.listAll()) { - schemaCache.put(schema.id(), schema); - } - return schemaCache; + tableSchemaManager = new SchemaManager(fileIO, path, new ConcurrentHashMap<>()); } @Override @@ -275,7 +263,7 @@ public FileStoreTable copyWithLatestSchema() { } protected SchemaManager schemaManager() { - return new SchemaManager(fileIO(), path, schemaCache); + return tableSchemaManager; } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 4034e39f3219..42f92c4745dd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -47,6 +47,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -97,8 +98,8 @@ public void afterEach() { @Test public void testCache() throws Exception { Path path = new Path(tempDir.toString()); - SchemaManager manager = new SchemaManager(FileIOFinder.find(path), path); - + SchemaManager manager = + new SchemaManager(FileIOFinder.find(path), path, new ConcurrentHashMap<>()); // schema-0 manager.createTable(schema); // schema-1 diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 7c506a25d0cc..a3c6a64c8c40 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1510,13 +1510,16 @@ protected void generateBranch(FileStoreTable table) throws Exception { @Test public void testSerAndDer() throws Exception { - FileStoreTable originFileStoreTable = createFileStoreTable(); - + AbstractFileStoreTable originFileStoreTable = + (AbstractFileStoreTable) createFileStoreTable(); + originFileStoreTable.schemaManager().latest(); byte[] bytes = InstantiationUtil.serializeObject(originFileStoreTable); AbstractFileStoreTable serializedFileStoreTable = InstantiationUtil.deserializeObject( bytes, Thread.currentThread().getContextClassLoader()); - assertThat(serializedFileStoreTable.schemaCache.size()).isEqualTo(1); - assertThat(serializedFileStoreTable.schemaCache).containsOnlyKeys(0L); + assertThat(serializedFileStoreTable.tableSchemaManager.getCachedSchema().size()) + .isEqualTo(1); + assertThat(serializedFileStoreTable.tableSchemaManager.getCachedSchema()) + .containsOnlyKeys(0L); } } From 85d4ab6d4d502177bab958a3dfaa43043e2c2a72 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Mon, 11 Mar 2024 18:22:41 +0800 Subject: [PATCH 6/9] refactor --- .../org/apache/paimon/schema/SchemaManager.java | 6 ++++-- .../apache/paimon/schema/SchemaManagerTest.java | 14 +++++--------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 04106822e696..01281a28e265 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -47,7 +47,6 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; - import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; @@ -58,6 +57,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -83,13 +83,15 @@ public class SchemaManager implements Serializable { private final Map cache; public SchemaManager(FileIO fileIO, Path tableRoot) { - this(fileIO, tableRoot, null); + this(fileIO, tableRoot, new ConcurrentHashMap<>()); } public SchemaManager(FileIO fileIO, Path tableRoot, Map cache) { this.fileIO = fileIO; this.tableRoot = tableRoot; this.cache = cache; + //cache + listAll(); } public SchemaManager withLock(@Nullable Lock lock) { diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 42f92c4745dd..07ebce6b2208 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -30,7 +30,6 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.FailingFileIO; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -47,7 +46,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -99,7 +97,7 @@ public void afterEach() { public void testCache() throws Exception { Path path = new Path(tempDir.toString()); SchemaManager manager = - new SchemaManager(FileIOFinder.find(path), path, new ConcurrentHashMap<>()); + new SchemaManager(FileIOFinder.find(path), path); // schema-0 manager.createTable(schema); // schema-1 @@ -107,13 +105,11 @@ public void testCache() throws Exception { // schema-2 manager.commitChanges(SchemaChange.setOption("ccc", "ddd")); - Map cachedSchema = manager.getCachedSchema(); - assertThat(cachedSchema).hasSize(3); + SchemaManager newSchemaManager = new SchemaManager(FileIOFinder.find(path), path); + Optional schemaOpt1 = newSchemaManager.latest(); + Optional schemaOpt2 = newSchemaManager.latest(); + assertThat(schemaOpt1.get()).isSameAs(schemaOpt2.get()); - manager.deleteSchema(1L); - assertThat(cachedSchema).hasSize(2); - TableSchema tableSchema = cachedSchema.get(2L); - assertThat(tableSchema.options()).containsKey("ccc"); } @Test From 12dae7c8ca1d6b117c09ee534084c21a123cd65a Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Mon, 11 Mar 2024 18:23:16 +0800 Subject: [PATCH 7/9] refactor --- .../main/java/org/apache/paimon/schema/SchemaManager.java | 3 ++- .../java/org/apache/paimon/schema/SchemaManagerTest.java | 7 +++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 01281a28e265..d5e0033e2691 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -47,6 +47,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; + import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; @@ -90,7 +91,7 @@ public SchemaManager(FileIO fileIO, Path tableRoot, Map cache this.fileIO = fileIO; this.tableRoot = tableRoot; this.cache = cache; - //cache + // cache listAll(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 07ebce6b2208..fb4dc7623f49 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -30,6 +30,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.FailingFileIO; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -96,8 +97,7 @@ public void afterEach() { @Test public void testCache() throws Exception { Path path = new Path(tempDir.toString()); - SchemaManager manager = - new SchemaManager(FileIOFinder.find(path), path); + SchemaManager manager = new SchemaManager(FileIOFinder.find(path), path); // schema-0 manager.createTable(schema); // schema-1 @@ -106,10 +106,9 @@ public void testCache() throws Exception { manager.commitChanges(SchemaChange.setOption("ccc", "ddd")); SchemaManager newSchemaManager = new SchemaManager(FileIOFinder.find(path), path); - Optional schemaOpt1 = newSchemaManager.latest(); + Optional schemaOpt1 = newSchemaManager.latest(); Optional schemaOpt2 = newSchemaManager.latest(); assertThat(schemaOpt1.get()).isSameAs(schemaOpt2.get()); - } @Test From 701c6711e6788fe3bd230f60464244be1e6abbd6 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Tue, 12 Mar 2024 11:17:34 +0800 Subject: [PATCH 8/9] refactor --- .../apache/paimon/schema/SchemaManager.java | 26 +++---------------- .../paimon/table/AbstractFileStoreTable.java | 3 +-- .../paimon/table/FileStoreTableTestBase.java | 16 ------------ 3 files changed, 5 insertions(+), 40 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index d5e0033e2691..0ee9206eccc4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -84,15 +84,9 @@ public class SchemaManager implements Serializable { private final Map cache; public SchemaManager(FileIO fileIO, Path tableRoot) { - this(fileIO, tableRoot, new ConcurrentHashMap<>()); - } - - public SchemaManager(FileIO fileIO, Path tableRoot, Map cache) { this.fileIO = fileIO; this.tableRoot = tableRoot; - this.cache = cache; - // cache - listAll(); + this.cache = new ConcurrentHashMap<>(); } public SchemaManager withLock(@Nullable Lock lock) { @@ -134,11 +128,6 @@ public List listAllIds() { } } - @VisibleForTesting - public Map getCachedSchema() { - return cache; - } - /** Create a new schema from {@link Schema}. */ public TableSchema createTable(Schema schema) throws Exception { while (true) { @@ -378,9 +367,7 @@ public TableSchema commitChanges(List changes) try { boolean success = commit(newSchema); if (success) { - if (cache != null) { - cache.put(newSchema.id(), newSchema); - } + cache.put(newSchema.id(), newSchema); return newSchema; } } catch (Exception e) { @@ -485,10 +472,7 @@ boolean commit(TableSchema newSchema) throws Exception { /** Read schema for schema id. */ public TableSchema schema(long id) { - if (cache != null) { - return cache.computeIfAbsent(id, this::loadSchemaFromFile); - } - return loadSchemaFromFile(id); + return cache.computeIfAbsent(id, this::loadSchemaFromFile); } private TableSchema loadSchemaFromFile(Long id) { @@ -532,9 +516,7 @@ public Path branchSchemaPath(String branchName, long schemaId) { */ public void deleteSchema(long schemaId) { fileIO.deleteQuietly(toSchemaPath(schemaId)); - if (cache != null) { - this.cache.remove(schemaId); - } + this.cache.remove(schemaId); } public static void checkAlterTableOption(String key) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 568988ac6082..0e608fb42a4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -65,7 +65,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import static org.apache.paimon.CoreOptions.PATH; @@ -100,7 +99,7 @@ protected AbstractFileStoreTable( } this.tableSchema = tableSchema; this.catalogEnvironment = catalogEnvironment; - tableSchemaManager = new SchemaManager(fileIO, path, new ConcurrentHashMap<>()); + tableSchemaManager = new SchemaManager(fileIO, path); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index a3c6a64c8c40..4cd019568c41 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -64,7 +64,6 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; -import org.apache.paimon.utils.InstantiationUtil; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; import org.apache.paimon.utils.TraceableFileIO; @@ -1507,19 +1506,4 @@ protected void generateBranch(FileStoreTable table) throws Exception { BATCH_ROW_TO_STRING)) .containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset"); } - - @Test - public void testSerAndDer() throws Exception { - AbstractFileStoreTable originFileStoreTable = - (AbstractFileStoreTable) createFileStoreTable(); - originFileStoreTable.schemaManager().latest(); - byte[] bytes = InstantiationUtil.serializeObject(originFileStoreTable); - AbstractFileStoreTable serializedFileStoreTable = - InstantiationUtil.deserializeObject( - bytes, Thread.currentThread().getContextClassLoader()); - assertThat(serializedFileStoreTable.tableSchemaManager.getCachedSchema().size()) - .isEqualTo(1); - assertThat(serializedFileStoreTable.tableSchemaManager.getCachedSchema()) - .containsOnlyKeys(0L); - } } From 3ae98153e0fdb6b2f7e62783965c2e9206e56e1b Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Tue, 12 Mar 2024 22:48:58 +0800 Subject: [PATCH 9/9] cache the schema of data file for reader --- .../org/apache/paimon/operation/AbstractFileStoreScan.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 399ee6c3c42c..3caa72c8dc60 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -286,6 +286,9 @@ private Pair> doPlan( Collection mergedEntries = ManifestEntry.mergeEntries(entries); long skippedByPartitionAndStats = startDataFiles - cntEntries.get(); for (ManifestEntry file : mergedEntries) { + // cache the data schema + schemaManager.schema(file.file().schemaId()); + if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) { String partInfo = partitionType.getFieldCount() > 0