From 638a36c80b275ffa7f1491e029400a23e63b1b28 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 7 Jan 2025 13:05:32 +0800 Subject: [PATCH 1/2] [flink] Fix the refresh executor not work after reopen --- .../flink/lookup/FullCacheLookupTable.java | 29 ++++++++++----- .../flink/lookup/NoPrimaryKeyLookupTable.java | 2 +- .../flink/lookup/PrimaryKeyLookupTable.java | 2 +- .../lookup/SecondaryIndexLookupTable.java | 2 +- .../paimon/flink/lookup/LookupTableTest.java | 35 +++++++++++++++++++ 5 files changed, 58 insertions(+), 12 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index de69c67a4c44..a20e1d4d5442 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.CoreOptions; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -79,7 +80,7 @@ public abstract class FullCacheLookupTable implements LookupTable { protected final int appendUdsFieldNumber; protected RocksDBStateFactory stateFactory; - @Nullable private final ExecutorService refreshExecutor; + @Nullable private ExecutorService refreshExecutor; private final AtomicReference cachedException; private final int maxPendingSnapshotCount; private final FileStoreTable table; @@ -127,14 +128,6 @@ public FullCacheLookupTable(Context context) { Options options = Options.fromMap(context.table.options()); this.projectedType = projectedType; this.refreshAsync = options.get(LOOKUP_REFRESH_ASYNC); - this.refreshExecutor = - this.refreshAsync - ? Executors.newSingleThreadExecutor( - new ExecutorThreadFactory( - String.format( - "%s-lookup-refresh", - Thread.currentThread().getName()))) - : null; this.cachedException = new AtomicReference<>(); this.maxPendingSnapshotCount = options.get(LOOKUP_REFRESH_ASYNC_PENDING_SNAPSHOT_COUNT); } @@ -149,6 +142,19 @@ public void specifyCacheRowFilter(Filter filter) { this.cacheRowFilter = filter; } + @Override + public void open() throws Exception { + this.refreshExecutor = + this.refreshAsync + ? Executors.newSingleThreadExecutor( + new ExecutorThreadFactory( + String.format( + "%s-lookup-refresh", + Thread.currentThread().getName()))) + : null; + openStateFactory(); + } + protected void openStateFactory() throws Exception { this.stateFactory = new RocksDBStateFactory( @@ -322,6 +328,11 @@ public void close() throws IOException { } } + @VisibleForTesting + public Future getRefreshFuture() { + return refreshFuture; + } + /** Bulk loader for the table. */ public interface TableBulkLoader { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java index 84587083bf1a..8ffcc58bd07b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java @@ -52,7 +52,7 @@ public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) { @Override public void open() throws Exception { - openStateFactory(); + super.open(); this.state = stateFactory.listState( "join-key-index", diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java index c06120d61d81..77ff112ef09a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java @@ -69,7 +69,7 @@ public PrimaryKeyLookupTable(Context context, long lruCacheSize, List jo @Override public void open() throws Exception { - openStateFactory(); + super.open(); createTableState(); bootstrap(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java index 5ebace6cd5c6..3270abf6de2b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java @@ -46,7 +46,7 @@ public SecondaryIndexLookupTable(Context context, long lruCacheSize) { @Override public void open() throws Exception { - openStateFactory(); + super.open(); createTableState(); this.indexState = stateFactory.setState( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 46c61a15bd8a..88b947133087 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -600,6 +600,41 @@ public void testPkTableWithCacheRowFilter() throws Exception { assertThat(res).isEmpty(); } + @Test + public void testRefreshExecutorRebuildAfterReopen() throws Exception { + Options options = new Options(); + options.set(FlinkConnectorOptions.LOOKUP_REFRESH_ASYNC, true); + FileStoreTable storeTable = createTable(singletonList("f0"), options); + writeWithBucketAssigner( + storeTable, row -> 0, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222)); + + FullCacheLookupTable.Context context = + new FullCacheLookupTable.Context( + storeTable, + new int[] {0, 1, 2}, + null, + null, + tempDir.toFile(), + singletonList("f0"), + null); + table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); + assertThat(table).isInstanceOf(PrimaryKeyLookupTable.class); + table.open(); + // reopen + table.close(); + table.open(); + List res = table.get(GenericRow.of(1)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 1, 11, 111); + writeWithBucketAssigner(storeTable, row -> 0, GenericRow.of(1, 22, 222)); + table.refresh(); + assertThat(table.getRefreshFuture()).isNotNull(); + table.getRefreshFuture().get(); + res = table.get(GenericRow.of(1)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 1, 22, 222); + } + @Test public void testNoPkTableWithCacheRowFilter() throws Exception { FileStoreTable storeTable = createTable(emptyList(), new Options()); From d7b709aacaebaeee2ecbcac5bee51a0180aec1bc Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 7 Jan 2025 13:31:29 +0800 Subject: [PATCH 2/2] fix double open --- .../flink/lookup/FullCacheLookupTable.java | 17 ++++++----------- .../flink/lookup/NoPrimaryKeyLookupTable.java | 2 +- .../flink/lookup/PrimaryKeyLookupTable.java | 2 +- .../flink/lookup/SecondaryIndexLookupTable.java | 2 +- 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index a20e1d4d5442..4154b6742c06 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -142,8 +142,12 @@ public void specifyCacheRowFilter(Filter filter) { this.cacheRowFilter = filter; } - @Override - public void open() throws Exception { + protected void init() throws Exception { + this.stateFactory = + new RocksDBStateFactory( + context.tempPath.toString(), + context.table.coreOptions().toConfiguration(), + null); this.refreshExecutor = this.refreshAsync ? Executors.newSingleThreadExecutor( @@ -152,15 +156,6 @@ public void open() throws Exception { "%s-lookup-refresh", Thread.currentThread().getName()))) : null; - openStateFactory(); - } - - protected void openStateFactory() throws Exception { - this.stateFactory = - new RocksDBStateFactory( - context.tempPath.toString(), - context.table.coreOptions().toConfiguration(), - null); } protected void bootstrap() throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java index 8ffcc58bd07b..63af4f3506f0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/NoPrimaryKeyLookupTable.java @@ -52,7 +52,7 @@ public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) { @Override public void open() throws Exception { - super.open(); + init(); this.state = stateFactory.listState( "join-key-index", diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java index 77ff112ef09a..2a3099e9a62b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyLookupTable.java @@ -69,7 +69,7 @@ public PrimaryKeyLookupTable(Context context, long lruCacheSize, List jo @Override public void open() throws Exception { - super.open(); + init(); createTableState(); bootstrap(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java index 3270abf6de2b..11c9cba24b39 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/SecondaryIndexLookupTable.java @@ -46,7 +46,7 @@ public SecondaryIndexLookupTable(Context context, long lruCacheSize) { @Override public void open() throws Exception { - super.open(); + init(); createTableState(); this.indexState = stateFactory.setState(