diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index de69c67a4c44..4154b6742c06 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.CoreOptions; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -79,7 +80,7 @@ public abstract class FullCacheLookupTable implements LookupTable { protected final int appendUdsFieldNumber; protected RocksDBStateFactory stateFactory; - @Nullable private final ExecutorService refreshExecutor; + @Nullable private ExecutorService refreshExecutor; private final AtomicReference cachedException; private final int maxPendingSnapshotCount; private final FileStoreTable table; @@ -127,14 +128,6 @@ public FullCacheLookupTable(Context context) { Options options = Options.fromMap(context.table.options()); this.projectedType = projectedType; this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC); - this.refreshExecutor = - this.refreshAsync - ? Executors.newSingleThreadExecutor( - new ExecutorThreadFactory( - String.format( - "%s-lookup-refresh", - Thread.currentThread().getName()))) - : null; this.cachedException = new AtomicReference<>(); this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT); } @@ -149,12 +142,20 @@ public void specifyCacheRowFilter(Filter filter) { this.cacheRowFilter = filter; } - protected void openStateFactory() throws Exception { + protected void init() throws Exception { this.stateFactory = new RocksDBStateFactory( context.tempPath.toString(), context.table.coreOptions().toConfiguration(), null); + this.refreshExecutor = + this.refreshAsync + ? Executors.newSingleThreadExecutor( + new ExecutorThreadFactory( + String.format( + "%s-lookup-refresh", + Thread.currentThread().getName()))) + : null; } protected void bootstrap() throws Exception { @@ -322,6 +323,11 @@ public void close() throws IOException { } } + @VisibleForTesting + public Future getRefreshFuture() { + return refreshFuture; + } + /** Bulk loader for the table. */ public interface TableBulkLoader { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java index 84587083bf1a..63af4f3506f0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java @@ -52,7 +52,7 @@ public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) { @Override public void open() throws Exception { - openStateFactory(); + init(); this.state = stateFactory.listState( "join-key-index", diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java index c06120d61d81..2a3099e9a62b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java @@ -69,7 +69,7 @@ public PrimaryKeyLookupTable(Context context, long lruCacheSize, List jo @Override public void open() throws Exception { - openStateFactory(); + init(); createTableState(); bootstrap(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java index 5ebace6cd5c6..11c9cba24b39 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java @@ -46,7 +46,7 @@ public SecondaryIndexLookupTable(Context context, long lruCacheSize) { @Override public void open() throws Exception { - openStateFactory(); + init(); createTableState(); this.indexState = stateFactory.setState( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 46c61a15bd8a..88b947133087 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -600,6 +600,41 @@ public void testPkTableWithCacheRowFilter() throws Exception { assertThat(res).isEmpty(); } + @Test + public void testRefreshExecutorRebuildAfterReopen() throws Exception { + Options options = new Options(); + options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, true); + FileStoreTable storeTable = createTable(singletonList("f0"), options); + writeWithBucketAssigner( + storeTable, row -> 0, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222)); + + FullCacheLookupTable.Context context = + new FullCacheLookupTable.Context( + storeTable, + new int[] {0, 1, 2}, + null, + null, + tempDir.toFile(), + singletonList("f0"), + null); + table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); + assertThat(table).isInstanceOf(PrimaryKeyLookupTable.class); + table.open(); + // reopen + table.close(); + table.open(); + List res = table.get(GenericRow.of(1)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 1, 11, 111); + writeWithBucketAssigner(storeTable, row -> 0, GenericRow.of(1, 22, 222)); + table.refresh(); + assertThat(table.getRefreshFuture()).isNotNull(); + table.getRefreshFuture().get(); + res = table.get(GenericRow.of(1)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 1, 22, 222); + } + @Test public void testNoPkTableWithCacheRowFilter() throws Exception { FileStoreTable storeTable = createTable(emptyList(), new Options());