diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 2b277a29b835..b56fec279ab1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -381,7 +381,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException { throw new TableNotExistException(identifier); } return table; - } else if (isSpecifiedSystemTable(identifier)) { + } else if (identifier.isSystemTable()) { Table originTable = getDataOrFormatTable( new Identifier( @@ -519,12 +519,8 @@ protected void assertMainBranch(Identifier identifier) { } } - public static boolean isSpecifiedSystemTable(Identifier identifier) { - return identifier.getSystemTableName() != null; - } - protected static boolean isTableInSystemDatabase(Identifier identifier) { - return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier); + return isSystemDatabase(identifier.getDatabaseName()) || identifier.isSystemTable(); } protected static void checkNotSystemTable(Identifier identifier, String method) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index f67f19700d6e..e92a589d411e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -26,7 +26,6 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.SystemTableLoader; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; @@ -48,7 +47,6 @@ import java.util.Map; import java.util.Optional; -import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable; import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED; import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY; @@ -56,7 +54,7 @@ import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD; import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM; import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE; -import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** A {@link Catalog} to cache databases and tables and manifests. */ public class CachingCatalog extends DelegateCatalog { @@ -203,6 +201,9 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists) throws TableNotExistException { super.dropTable(identifier, ignoreIfNotExists); invalidateTable(identifier); + if (identifier.isMainTable()) { + invalidateAttachedTables(identifier); + } } @Override @@ -227,26 +228,23 @@ public Table getTable(Identifier identifier) throws TableNotExistException { return table; } - if (isSpecifiedSystemTable(identifier)) { + // For system table, do not cache it directly. Instead, cache the origin table and then wrap + // it to generate the system table. + if (identifier.isSystemTable()) { Identifier originIdentifier = new Identifier( identifier.getDatabaseName(), identifier.getTableName(), identifier.getBranchName(), null); - Table originTable = tableCache.getIfPresent(originIdentifier); - if (originTable == null) { - originTable = wrapped.getTable(originIdentifier); - putTableCache(originIdentifier, originTable); - } + Table originTable = getTable(originIdentifier); table = SystemTableLoader.load( - Preconditions.checkNotNull(identifier.getSystemTableName()), + checkNotNull(identifier.getSystemTableName()), (FileStoreTable) originTable); if (table == null) { throw new TableNotExistException(identifier); } - putTableCache(identifier, table); return table; } @@ -309,7 +307,7 @@ private class TableInvalidatingRemovalListener implements RemovalListener allSystemTables(Identifier ident) { - List tables = new ArrayList<>(); - for (String type : SYSTEM_TABLES) { - tables.add(Identifier.fromString(ident.getFullName() + SYSTEM_TABLE_SPLITTER + type)); + /** invalidate attached tables, such as cached branches. */ + private void invalidateAttachedTables(Identifier identifier) { + for (@NonNull Identifier i : tableCache.asMap().keySet()) { + if (identifier.getTableName().equals(i.getTableName())) { + tableCache.invalidate(i); + } } - return tables; } // ================================== refresh ================================================ diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java index 72da69b67b83..6cca6824e32b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java @@ -65,6 +65,10 @@ public Identifier(String database, String object) { this.object = object; } + public Identifier(String database, String table, @Nullable String branch) { + this(database, table, branch, null); + } + public Identifier( String database, String table, @Nullable String branch, @Nullable String systemTable) { this.database = database; @@ -119,6 +123,18 @@ public String getBranchNameOrDefault() { return systemTable; } + public boolean isMainTable() { + return getBranchName() == null && getSystemTableName() == null; + } + + public boolean isBranch() { + return getBranchName() != null && getSystemTableName() == null; + } + + public boolean isSystemTable() { + return getSystemTableName() != null; + } + private void splitObjectName() { if (table != null) { return; diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index 7567f682ae60..4792e33c932b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -48,10 +48,8 @@ import java.io.FileNotFoundException; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -101,14 +99,49 @@ public void testInvalidateSystemTablesIfBaseTableIsModified() throws Exception { @Test public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception { - Catalog catalog = new CachingCatalog(this.catalog); + TestableCachingCatalog catalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); Identifier tableIdent = new Identifier("db", "tbl"); catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false); Identifier sysIdent = new Identifier("db", "tbl$files"); + // get system table will only cache the origin table catalog.getTable(sysIdent); + assertThat(catalog.tableCache.asMap()).containsKey(tableIdent); + assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent); + // test case sensitivity + Identifier sysIdent1 = new Identifier("db", "tbl$SNAPSHOTS"); + catalog.getTable(sysIdent1); + assertThat(catalog.tableCache.asMap()).doesNotContainKey(sysIdent1); + catalog.dropTable(tableIdent, false); + assertThat(catalog.tableCache.asMap()).doesNotContainKey(tableIdent); assertThatThrownBy(() -> catalog.getTable(sysIdent)) .hasMessage("Table db.tbl does not exist."); + assertThatThrownBy(() -> catalog.getTable(sysIdent1)) + .hasMessage("Table db.tbl does not exist."); + } + + @Test + public void testInvalidateBranchIfBaseTableIsDropped() throws Exception { + TestableCachingCatalog catalog = + new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); + Identifier tableIdent = new Identifier("db", "tbl"); + catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false); + catalog.getTable(tableIdent).createBranch("b1"); + + Identifier branchIdent = new Identifier("db", "tbl$branch_b1"); + Identifier branchSysIdent = new Identifier("db", "tbl$branch_b1$FILES"); + // get system table will only cache the origin table + catalog.getTable(branchSysIdent); + assertThat(catalog.tableCache.asMap()).containsKey(branchIdent); + assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchSysIdent); + + catalog.dropTable(tableIdent, false); + assertThat(catalog.tableCache.asMap()).doesNotContainKey(branchIdent); + assertThatThrownBy(() -> catalog.getTable(branchIdent)) + .hasMessage("Table db.tbl$branch_b1 does not exist."); + assertThatThrownBy(() -> catalog.getTable(branchSysIdent)) + .hasMessage("Table db.tbl$branch_b1 does not exist."); } @Test @@ -175,59 +208,6 @@ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog() throws Exce assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION); } - @Test - public void testCacheExpirationEagerlyRemovesSysTables() throws Exception { - TestableCachingCatalog catalog = - new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, ticker); - - Identifier tableIdent = new Identifier("db", "tbl"); - catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); - Table table = catalog.getTable(tableIdent); - assertThat(catalog.tableCache().asMap()).containsKey(tableIdent); - assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO); - - ticker.advance(HALF_OF_EXPIRATION); - assertThat(catalog.tableCache().asMap()).containsKey(tableIdent); - assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION); - - for (Identifier sysTable : sysTables(tableIdent)) { - catalog.getTable(sysTable); - } - assertThat(catalog.tableCache().asMap()).containsKeys(sysTables(tableIdent)); - assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf)) - .isNotEmpty() - .allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO)); - - assertThat(catalog.remainingAgeFor(tableIdent)) - .as("Loading a non-cached sys table should refresh the main table's age") - .isEqualTo(Optional.of(EXPIRATION_TTL)); - - // Move time forward and access already cached sys tables. - ticker.advance(HALF_OF_EXPIRATION); - for (Identifier sysTable : sysTables(tableIdent)) { - catalog.getTable(sysTable); - } - assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf)) - .isNotEmpty() - .allMatch(age -> age.isPresent() && age.get().equals(Duration.ZERO)); - - assertThat(catalog.remainingAgeFor(tableIdent)) - .as("Accessing a cached sys table should not affect the main table's age") - .isEqualTo(Optional.of(HALF_OF_EXPIRATION)); - - // Move time forward so the data table drops. - ticker.advance(HALF_OF_EXPIRATION); - assertThat(catalog.tableCache().asMap()).doesNotContainKey(tableIdent); - - Arrays.stream(sysTables(tableIdent)) - .forEach( - sysTable -> - assertThat(catalog.tableCache().asMap()) - .as( - "When a data table expires, its sys tables should expire regardless of age") - .doesNotContainKeys(sysTable)); - } - @Test public void testPartitionCache() throws Exception { TestableCachingCatalog catalog = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java index c95fd62bee40..56c649028650 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java @@ -49,7 +49,10 @@ public String identifier() { }) public String[] call(ProcedureContext procedureContext, String tableId, String branchStr) throws Catalog.TableNotExistException { - catalog.getTable(Identifier.fromString(tableId)).deleteBranches(branchStr); + Identifier identifier = Identifier.fromString(tableId); + catalog.getTable(identifier).deleteBranches(branchStr); + catalog.invalidateTable( + new Identifier(identifier.getDatabaseName(), identifier.getTableName(), branchStr)); return new String[] {"Success"}; } } diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala index a3cecfc72e1d..cb449edb4ccb 100644 --- a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/DataFrameWriteTest.scala @@ -18,10 +18,15 @@ package org.apache.paimon.spark +import org.apache.spark.SparkConf import org.junit.jupiter.api.Assertions class DataFrameWriteTest extends PaimonSparkTestBase { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + test("Paimon: DataFrameWrite.saveAsTable") { import testImplicits._ diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index 18fb9e116ba4..ab4a9bcd9dbf 100644 --- a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark +import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.{col, mean, window} @@ -27,6 +28,10 @@ import java.sql.Date class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + import testImplicits._ test("Paimon Sink: forEachBatch") { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index 9957f0cdf91f..63d75a53ef2e 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -186,7 +186,7 @@ public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableExcep @Override public void invalidateTable(Identifier ident) { // We do not need to check whether the table exists and whether - // it is an Paimon table to reduce remote service requests. + // it is a Paimon table to reduce remote service requests. sparkCatalog.invalidateTable(ident); asTableCatalog().invalidateTable(ident); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java index e398eee0261f..4a01c33d6af1 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DeleteBranchProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.spark.catalog.WithPaimonCatalog; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -61,13 +63,20 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String branchStr = args.getString(1); - return modifyPaimonTable( - tableIdent, - table -> { - table.deleteBranches(branchStr); - InternalRow outputRow = newInternalRow(true); - return new InternalRow[] {outputRow}; - }); + InternalRow[] result = + modifyPaimonTable( + tableIdent, + table -> { + table.deleteBranches(branchStr); + InternalRow outputRow = newInternalRow(true); + return new InternalRow[] {outputRow}; + }); + ((WithPaimonCatalog) tableCatalog()) + .paimonCatalog() + .invalidateTable( + new org.apache.paimon.catalog.Identifier( + tableIdent.namespace()[0], tableIdent.name(), branchStr)); + return result; } public static ProcedureBuilder builder() { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala index 63203122ac40..61bf5524942d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark +import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.{col, mean, window} @@ -27,6 +28,10 @@ import java.sql.Date class PaimonSinkTest extends PaimonSparkTestBase with StreamTest { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + import testImplicits._ test("Paimon Sink: forEachBatch") { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index 605b2e6ca5f2..867b3e5e3337 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -66,7 +66,6 @@ class PaimonSparkTestBase super.sparkConf .set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName) .set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath) - .set("spark.sql.catalog.paimon.cache-enabled", "false") .set("spark.sql.extensions", classOf[PaimonSparkSessionExtensions].getName) .set("spark.serializer", serializer) } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala index a0a94afacfb9..edd092c85ce8 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala @@ -20,6 +20,7 @@ package org.apache.paimon.spark.sql import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.types.DecimalType import org.junit.jupiter.api.Assertions @@ -27,6 +28,11 @@ import org.junit.jupiter.api.Assertions import java.sql.{Date, Timestamp} class DataFrameWriteTest extends PaimonSparkTestBase { + + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false") + } + import testImplicits._ test("Paimon: DataFrameWrite.saveAsTable") {