diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java index 37a504c5881e..7c30a1038c1b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/DynamicPartitionLoader.java @@ -31,9 +31,10 @@ import java.io.Serializable; import java.time.Duration; import java.time.LocalDateTime; +import java.util.Collections; import java.util.Comparator; import java.util.List; -import java.util.Objects; +import java.util.stream.Collectors; import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -45,22 +46,27 @@ public class DynamicPartitionLoader implements Serializable { private static final String MAX_PT = "max_pt()"; + private static final String MAX_TWO_PT = "max_two_pt()"; + private final Table table; private final Duration refreshInterval; + private final int maxPartitionNum; private Comparator comparator; private LocalDateTime lastRefresh; - @Nullable private BinaryRow partition; + private List partitions; - private DynamicPartitionLoader(Table table, Duration refreshInterval) { + private DynamicPartitionLoader(Table table, Duration refreshInterval, int maxPartitionNum) { this.table = table; this.refreshInterval = refreshInterval; + this.maxPartitionNum = maxPartitionNum; } public void open() { RowType partitionType = table.rowType().project(table.partitionKeys()); this.comparator = CodeGenUtils.newRecordComparator(partitionType.getFieldTypes()); + this.partitions = Collections.emptyList(); } public void addPartitionKeysTo(List joinKeys, List projectFields) { @@ -71,9 +77,8 @@ public void addPartitionKeysTo(List joinKeys, List projectFields partitionKeys.stream().filter(k -> !projectFields.contains(k)).forEach(projectFields::add); } - @Nullable - public BinaryRow partition() { - return partition; + public List partitions() { + return partitions; } /** @return true if partition changed. */ @@ -83,14 +88,34 @@ public boolean checkRefresh() { return false; } - BinaryRow previous = this.partition; - partition = - table.newReadBuilder().newScan().listPartitions().stream() - .max(comparator) - .orElse(null); + List newPartitions = getMaxPartitions(); lastRefresh = LocalDateTime.now(); - return !Objects.equals(previous, partition); + if (newPartitions.size() != partitions.size()) { + partitions = newPartitions; + return true; + } else { + for (int i = 0; i < newPartitions.size(); i++) { + if (comparator.compare(newPartitions.get(i), partitions.get(i)) != 0) { + partitions = newPartitions; + return true; + } + } + return false; + } + } + + private List getMaxPartitions() { + List newPartitions = + table.newReadBuilder().newScan().listPartitions().stream() + .sorted(comparator.reversed()) + .collect(Collectors.toList()); + + if (newPartitions.size() <= maxPartitionNum) { + return newPartitions; + } else { + return newPartitions.subList(0, maxPartitionNum); + } } @Nullable @@ -101,13 +126,21 @@ public static DynamicPartitionLoader of(Table table) { return null; } - if (!dynamicPartition.equalsIgnoreCase(MAX_PT)) { - throw new UnsupportedOperationException( - "Unsupported dynamic partition pattern: " + dynamicPartition); + int maxPartitionNum; + switch (dynamicPartition.toLowerCase()) { + case MAX_PT: + maxPartitionNum = 1; + break; + case MAX_TWO_PT: + maxPartitionNum = 2; + break; + default: + throw new UnsupportedOperationException( + "Unsupported dynamic partition pattern: " + dynamicPartition); } Duration refresh = options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL); - return new DynamicPartitionLoader(table, refresh); + return new DynamicPartitionLoader(table, refresh, maxPartitionNum); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index e3f2fe110c6c..daf196d37187 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -28,6 +28,7 @@ import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.OutOfRangeException; @@ -203,9 +204,9 @@ private void open() throws Exception { if (partitionLoader != null) { partitionLoader.open(); partitionLoader.checkRefresh(); - BinaryRow partition = partitionLoader.partition(); - if (partition != null) { - lookupTable.specificPartitionFilter(createSpecificPartFilter(partition)); + List partitions = partitionLoader.partitions(); + if (!partitions.isEmpty()) { + lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions)); } } @@ -236,17 +237,17 @@ public Collection lookup(RowData keyRow) { tryRefresh(); InternalRow key = new FlinkRowWrapper(keyRow); - if (partitionLoader != null) { - if (partitionLoader.partition() == null) { - return Collections.emptyList(); - } - key = JoinedRow.join(key, partitionLoader.partition()); + if (partitionLoader == null) { + return lookupInternal(key); + } + + if (partitionLoader.partitions().isEmpty()) { + return Collections.emptyList(); } - List results = lookupTable.get(key); - List rows = new ArrayList<>(results.size()); - for (InternalRow matchedRow : results) { - rows.add(new FlinkRowData(matchedRow)); + List rows = new ArrayList<>(); + for (BinaryRow partition : partitionLoader.partitions()) { + rows.addAll(lookupInternal(JoinedRow.join(key, partition))); } return rows; } catch (OutOfRangeException | ReopenException e) { @@ -257,7 +258,28 @@ public Collection lookup(RowData keyRow) { } } - private Predicate createSpecificPartFilter(BinaryRow partition) { + private List lookupInternal(InternalRow key) throws IOException { + List rows = new ArrayList<>(); + List lookupResults = lookupTable.get(key); + for (InternalRow matchedRow : lookupResults) { + rows.add(new FlinkRowData(matchedRow)); + } + return rows; + } + + private Predicate createSpecificPartFilter(List partitions) { + Predicate partFilter = null; + for (BinaryRow partition : partitions) { + if (partFilter == null) { + partFilter = createSinglePartFilter(partition); + } else { + partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition)); + } + } + return partFilter; + } + + private Predicate createSinglePartFilter(BinaryRow partition) { RowType rowType = table.rowType(); List partitionKeys = table.partitionKeys(); Object[] partitionSpec = @@ -291,15 +313,15 @@ void tryRefresh() throws Exception { // 2. refresh dynamic partition if (partitionLoader != null) { boolean partitionChanged = partitionLoader.checkRefresh(); - BinaryRow partition = partitionLoader.partition(); - if (partition == null) { + List partitions = partitionLoader.partitions(); + if (partitions.isEmpty()) { // no data to be load, fast exit return; } if (partitionChanged) { // reopen with latest partition - lookupTable.specificPartitionFilter(createSpecificPartFilter(partition)); + lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions)); lookupTable.close(); lookupTable.open(); // no need to refresh the lookup table because it is reopened diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java index a6abde57b80c..86d4810c5992 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupJoinITCase.java @@ -1006,4 +1006,43 @@ public void testOverwriteDimTable(boolean isPkTable) throws Exception { iterator.close(); } + + @ParameterizedTest + @EnumSource(LookupCacheMode.class) + public void testLookupMaxTwoPt0(LookupCacheMode mode) throws Exception { + sql( + "CREATE TABLE PARTITIONED_DIM (pt STRING, i INT, v INT)" + + "PARTITIONED BY (`pt`) WITH (" + + "'lookup.dynamic-partition' = 'max_two_pt()', " + + "'lookup.dynamic-partition.refresh-interval' = '1 ms', " + + "'lookup.cache' = '%s', " + + "'continuous.discovery-interval'='1 ms')", + mode); + + String query = + "SELECT D.pt, T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i"; + BlockingIterator iterator = BlockingIterator.of(sEnv.executeSql(query).collect()); + + sql("INSERT INTO PARTITIONED_DIM VALUES ('2024-10-01', 1, 1), ('2024-10-01', 2, 2)"); + Thread.sleep(500); // wait refresh + sql("INSERT INTO T VALUES (1)"); + List result = iterator.collect(1); + assertThat(result).containsExactlyInAnyOrder(Row.of("2024-10-01", 1, 1)); + + sql("INSERT INTO PARTITIONED_DIM VALUES ('2024-10-02', 2, 2)"); + Thread.sleep(500); // wait refresh + sql("INSERT INTO T VALUES (2)"); + result = iterator.collect(2); + assertThat(result) + .containsExactlyInAnyOrder(Row.of("2024-10-01", 2, 2), Row.of("2024-10-02", 2, 2)); + + sql("ALTER TABLE PARTITIONED_DIM DROP PARTITION (pt = '2024-10-01')"); + Thread.sleep(500); // wait refresh + sql("INSERT INTO T VALUES (1), (2)"); + result = iterator.collect(2); + assertThat(result) + .containsExactlyInAnyOrder(Row.of(null, 1, null), Row.of("2024-10-02", 2, 2)); + + iterator.close(); + } }