From c8ef3420ac3ffa895c5de1aa2cd053405cb862ca Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 11 Dec 2024 11:42:44 +0800 Subject: [PATCH 1/3] update --- .../paimon/catalog/AbstractCatalog.java | 8 +-- .../apache/paimon/catalog/CachingCatalog.java | 41 +++++------ .../org/apache/paimon/catalog/Identifier.java | 20 ++++++ .../paimon/catalog/CachingCatalogTest.java | 72 ++++++++++++------- .../procedure/DeleteBranchProcedure.java | 5 +- .../paimon/spark/DataFrameWriteTest.scala | 5 ++ .../apache/paimon/spark/PaimonSinkTest.scala | 5 ++ .../paimon/spark/SparkGenericCatalog.java | 2 +- .../procedure/DeleteBranchProcedure.java | 23 ++++-- .../apache/paimon/spark/PaimonSinkTest.scala | 5 ++ .../paimon/spark/PaimonSparkTestBase.scala | 1 - .../paimon/spark/sql/DataFrameWriteTest.scala | 6 ++ 12 files changed, 127 insertions(+), 66 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 2b277a29b835..b56fec279ab1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -381,7 +381,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException { throw new TableNotExistException(identifier); } return table; - } else if (isSpecifiedSystemTable(identifier)) { + } else if (identifier.isSystemTable()) { Table originTable = getDataOrFormatTable( new Identifier( @@ -519,12 +519,8 @@ protected void assertMainBranch(Identifier identifier) { } } - public static boolean isSpecifiedSystemTable(Identifier identifier) { - return identifier.getSystemTableName() != null; - } - protected static boolean isTableInSystemDatabase(Identifier identifier) { - return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier); + return isSystemDatabase(identifier.getDatabaseName()) || identifier.isSystemTable(); } protected static void checkNotSystemTable(Identifier identifier, String method) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index f67f19700d6e..abd6aa398f9f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -26,7 +26,6 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.SystemTableLoader; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; @@ -48,7 +47,6 @@ import java.util.Map; import java.util.Optional; -import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable; import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY; @@ -56,7 +54,7 @@ import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD; import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM; import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE; -import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** A {@link Catalog} to cache databases and tables and manifests. */ public class CachingCatalog extends DelegateCatalog { @@ -227,13 +225,10 @@ public Table getTable(Identifier identifier) throws TableNotExistException { return table; } - if (isSpecifiedSystemTable(identifier)) { - Identifier originIdentifier = - new Identifier( - identifier.getDatabaseName(), - identifier.getTableName(), - identifier.getBranchName(), - null); + // For system table, do not cache it directly. Instead, cache the origin table and then wrap + // it to generate the system table. + if (identifier.isSystemTable()) { + Identifier originIdentifier = identifier.toOriginTable(); Table originTable = tableCache.getIfPresent(originIdentifier); if (originTable == null) { originTable = wrapped.getTable(originIdentifier); @@ -241,12 +236,11 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } table = SystemTableLoader.load( - Preconditions.checkNotNull(identifier.getSystemTableName()), + checkNotNull(identifier.getSystemTableName()), (FileStoreTable) originTable); if (table == null) { throw new TableNotExistException(identifier); } - putTableCache(identifier, table); return table; } @@ -309,7 +303,7 @@ private class TableInvalidatingRemovalListener implements RemovalListener allSystemTables(Identifier ident) { - List tables = new ArrayList<>(); - for (String type : SYSTEM_TABLES) { - tables.add(Identifier.fromString(ident.getFullName() + SYSTEM_TABLE_SPLITTER + type)); + private void tryInvalidateAttachedTables(Identifier identifier) { + if (identifier.isMainTable()) { + // invalidate cached branches + for (@NonNull Identifier i : tableCache.asMap().keySet()) { + if (identifier.getTableName().equals(i.getTableName())) { + tableCache.invalidate(i); + } + } } - return tables; } // ================================== refresh ================================================ diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java index 72da69b67b83..c5c8596c8b42 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java @@ -65,6 +65,10 @@ public Identifier(String database, String object) { this.object = object; } + public Identifier(String database, String table, @Nullable String branch) { + this(database, table, branch, null); + } + public Identifier( String database, String table, @Nullable String branch, @Nullable String systemTable) { this.database = database; @@ -119,6 +123,22 @@ public String getBranchNameOrDefault() { return systemTable; } + public boolean isMainTable() { + return getBranchName() == null && getSystemTableName() == null; + } + + public boolean isBranch() { + return getBranchName() != null && getSystemTableName() == null; + } + + public boolean isSystemTable() { + return getSystemTableName() != null; + } + + public Identifier toOriginTable() { + return new Identifier(database, getTableName(), getBranchName()); + } + private void splitObjectName() { if (table != null) { return; diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index 7567f682ae60..098f075eccfc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -48,7 +48,6 @@ import java.io.FileNotFoundException; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -101,14 +100,49 @@ public void testInvalidateSystemTablesIfBaseTableIsModified() throws Exception { @Test public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception { - Catalog catalog = new CachingCatalog(this.catalog); + TestableCachingCatalog catalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); Identifier tableIdent = new Identifier("db", "tbl"); catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false); Identifier sysIdent = new Identifier("db", "tbl$files"); + // get system table will only cache the origin table catalog.getTable(sysIdent); + assertThat(catalog.tableCache.asMap()).containsKey(tableIdent); + assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent); + // test case sensitivity + Identifier sysIdent1 = new Identifier("db", "tbl$SNAPSHOTS"); + catalog.getTable(sysIdent1); + assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent1); + catalog.dropTable(tableIdent, false); + assertThat(catalog.tableCache.asMap()).doesNotContainKey(tableIdent); assertThatThrownBy(() -> catalog.getTable(sysIdent)) .hasMessage("Table db.tbl does not exist."); + assertThatThrownBy(() -> catalog.getTable(sysIdent1)) + .hasMessage("Table db.tbl does not exist."); + } + + @Test + public void testInvalidateBranchIfBaseTableIsDropped() throws Exception { + TestableCachingCatalog catalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); + Identifier tableIdent = new Identifier("db", "tbl"); + catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false); + catalog.getTable(tableIdent).createBranch("b1"); + + Identifier branchIdent = new Identifier("db", "tbl$branch_b1"); + Identifier branchSysIdent = new Identifier("db", "tbl$branch_b1$FILES"); + // get system table will only cache the origin table + catalog.getTable(branchSysIdent); + assertThat(catalog.tableCache.asMap()).containsKey(branchIdent); + assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchSysIdent); + + catalog.dropTable(tableIdent, false); + assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchIdent); + assertThatThrownBy(() -> catalog.getTable(branchIdent)) + .hasMessage("Table db.tbl$branch_b1 does not exist."); + assertThatThrownBy(() -> catalog.getTable(branchSysIdent)) + .hasMessage("Table db.tbl$branch_b1 does not exist."); } @Test @@ -176,7 +210,7 @@ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exce } @Test - public void testCacheExpirationEagerlyRemovesSysTables() throws Exception { + public void testCacheExpirationEagerlyRemovesBranch() throws Exception { TestableCachingCatalog catalog = new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); @@ -190,26 +224,21 @@ public void testCacheExpirationEagerlyRemovesSysTables() throws Exception { assertThat(catalog.tableCache().asMap()).containsKey(tableIdent); assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION); - for (Identifier sysTable : sysTables(tableIdent)) { - catalog.getTable(sysTable); - } - assertThat(catalog.tableCache().asMap()).containsKeys(sysTables(tableIdent)); - assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf)) - .isNotEmpty() - .allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO)); + catalog.getTable(tableIdent).createBranch("b1"); + Identifier branchIdent = new Identifier("db", "tbl$branch_b1"); + catalog.getTable(branchIdent); + + assertThat(catalog.tableCache().asMap()).containsKeys(branchIdent); + assertThat(catalog.ageOf(branchIdent)).get().isEqualTo(Duration.ZERO); assertThat(catalog.remainingAgeFor(tableIdent)) .as("Loading a non-cached sys table should refresh the main table's age") .isEqualTo(Optional.of(EXPIRATION_TTL)); - // Move time forward and access already cached sys tables. + // Move time forward and access already cached branch. ticker.advance(HALF_OF_EXPIRATION); - for (Identifier sysTable : sysTables(tableIdent)) { - catalog.getTable(sysTable); - } - assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf)) - .isNotEmpty() - .allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO)); + catalog.getTable(branchIdent); + assertThat(catalog.ageOf(branchIdent)).get().isEqualTo(Duration.ZERO); assertThat(catalog.remainingAgeFor(tableIdent)) .as("Accessing a cached sys table should not affect the main table's age") @@ -218,14 +247,7 @@ public void testCacheExpirationEagerlyRemovesSysTables() throws Exception { // Move time forward so the data table drops. ticker.advance(HALF_OF_EXPIRATION); assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent); - - Arrays.stream(sysTables(tableIdent)) - .forEach( - sysTable -> - assertThat(catalog.tableCache().asMap()) - .as( - "When a data table expires, its sys tables should expire regardless of age") - .doesNotContainKeys(sysTable)); + assertThat(catalog.tableCache().asMap()).doesNotContainKey(branchIdent); } @Test diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java index c95fd62bee40..56c649028650 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java @@ -49,7 +49,10 @@ public String identifier() { }) public String[] call(ProcedureContext procedureContext, String tableId, String branchStr) throws Catalog.TableNotExistException { - catalog.getTable(Identifier.fromString(tableId)).deleteBranches(branchStr); + Identifier identifier = Identifier.fromString(tableId); + catalog.getTable(identifier).deleteBranches(branchStr); + catalog.invalidateTable( + new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branchStr)); return new String[] {"Success"}; } } diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala index a3cecfc72e1d..cb449edb4ccb 100644 --- a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala @@ -18,10 +18,15 @@ package org.apache.paimon.spark +import org.apache.spark.SparkConf import org.junit.jupiter.api.Assertions class DataFrameWriteTest extends PaimonSparkTestBase { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + test("Paimon: DataFrameWrite.saveAsTable") { import testImplicits._ diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index 18fb9e116ba4..ab4a9bcd9dbf 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark +import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.{col, mean, window} @@ -27,6 +28,10 @@ import java.sql.Date class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + import testImplicits._ test("Paimon Sink: forEachBatch") { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index 9957f0cdf91f..63d75a53ef2e 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -186,7 +186,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep @Override public void invalidateTable(Identifier ident) { // We do not need to check whether the table exists and whether - // it is an Paimon table to reduce remote service requests. + // it is a Paimon table to reduce remote service requests. sparkCatalog.invalidateTable(ident); asTableCatalog().invalidateTable(ident); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java index e398eee0261f..4a01c33d6af1 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.spark.catalog.WithPaimonCatalog; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -61,13 +63,20 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String branchStr = args.getString(1); - return modifyPaimonTable( - tableIdent, - table -> { - table.deleteBranches(branchStr); - InternalRow outputRow = newInternalRow(true); - return new InternalRow[] {outputRow}; - }); + InternalRow[] result = + modifyPaimonTable( + tableIdent, + table -> { + table.deleteBranches(branchStr); + InternalRow outputRow = newInternalRow(true); + return new InternalRow[] {outputRow}; + }); + ((WithPaimonCatalog) tableCatalog()) + .paimonCatalog() + .invalidateTable( + new org.apache.paimon.catalog.Identifier( + tableIdent.namespace()[0], tableIdent.name(), branchStr)); + return result; } public static ProcedureBuilder builder() { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index 63203122ac40..61bf5524942d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark +import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.{col, mean, window} @@ -27,6 +28,10 @@ import java.sql.Date class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + import testImplicits._ test("Paimon Sink: forEachBatch") { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index 605b2e6ca5f2..867b3e5e3337 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -66,7 +66,6 @@ class PaimonSparkTestBase super.sparkConf .set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName) .set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath) - .set("spark.sql.catalog.paimon.cache-enabled", "false") .set("spark.sql.extensions", classOf[PaimonSparkSessionExtensions].getName) .set("spark.serializer", serializer) } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala index a0a94afacfb9..edd092c85ce8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala @@ -20,6 +20,7 @@ package org.apache.paimon.spark.sql import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.types.DecimalType import org.junit.jupiter.api.Assertions @@ -27,6 +28,11 @@ import org.junit.jupiter.api.Assertions import java.sql.{Date, Timestamp} class DataFrameWriteTest extends PaimonSparkTestBase { + + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + import testImplicits._ test("Paimon: DataFrameWrite.saveAsTable") { From 0029018d7a4afd9978ea0935df3222e5f4272c1a Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Wed, 11 Dec 2024 11:54:44 +0800 Subject: [PATCH 2/3] update --- .../java/org/apache/paimon/catalog/CachingCatalog.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index abd6aa398f9f..ae2378c1600b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -228,12 +228,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException { // For system table, do not cache it directly. Instead, cache the origin table and then wrap // it to generate the system table. if (identifier.isSystemTable()) { - Identifier originIdentifier = identifier.toOriginTable(); - Table originTable = tableCache.getIfPresent(originIdentifier); - if (originTable == null) { - originTable = wrapped.getTable(originIdentifier); - putTableCache(originIdentifier, originTable); - } + Table originTable = getTable(identifier.toOriginTable()); table = SystemTableLoader.load( checkNotNull(identifier.getSystemTableName()), From 29ca80a721da46ea756e5c596bb425f36127e496 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 12 Dec 2024 10:49:44 +0800 Subject: [PATCH 3/3] update for comment --- .../apache/paimon/catalog/CachingCatalog.java | 26 +++++++----- .../org/apache/paimon/catalog/Identifier.java | 4 -- .../paimon/catalog/CachingCatalogTest.java | 42 ------------------- 3 files changed, 16 insertions(+), 56 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index ae2378c1600b..e92a589d411e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -201,6 +201,9 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists) throws TableNotExistException { super.dropTable(identifier, ignoreIfNotExists); invalidateTable(identifier); + if (identifier.isMainTable()) { + invalidateAttachedTables(identifier); + } } @Override @@ -228,7 +231,13 @@ public Table getTable(Identifier identifier) throws TableNotExistException { // For system table, do not cache it directly. Instead, cache the origin table and then wrap // it to generate the system table. if (identifier.isSystemTable()) { - Table originTable = getTable(identifier.toOriginTable()); + Identifier originIdentifier = + new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getBranchName(), + null); + Table originTable = getTable(originIdentifier); table = SystemTableLoader.load( checkNotNull(identifier.getSystemTableName()), @@ -298,7 +307,7 @@ private class TableInvalidatingRemovalListener implements RemovalListener