Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand All @@ -45,22 +46,27 @@ public class DynamicPartitionLoader implements Serializable {

private static final String MAX_PT = "max_pt()";

private static final String MAX_TWO_PT = "max_two_pt()";

private final Table table;
private final Duration refreshInterval;
private final int maxPartitionNum;

private Comparator<InternalRow> comparator;

private LocalDateTime lastRefresh;
@Nullable private BinaryRow partition;
private List<BinaryRow> partitions;

private DynamicPartitionLoader(Table table, Duration refreshInterval) {
private DynamicPartitionLoader(Table table, Duration refreshInterval, int maxPartitionNum) {
this.table = table;
this.refreshInterval = refreshInterval;
this.maxPartitionNum = maxPartitionNum;
}

public void open() {
RowType partitionType = table.rowType().project(table.partitionKeys());
this.comparator = CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
this.partitions = Collections.emptyList();
}

public void addPartitionKeysTo(List<String> joinKeys, List<String> projectFields) {
Expand All @@ -71,9 +77,8 @@ public void addPartitionKeysTo(List<String> joinKeys, List<String> projectFields
partitionKeys.stream().filter(k -> !projectFields.contains(k)).forEach(projectFields::add);
}

@Nullable
public BinaryRow partition() {
return partition;
public List<BinaryRow> partitions() {
return partitions;
}

/** @return true if partition changed. */
Expand All @@ -83,14 +88,34 @@ public boolean checkRefresh() {
return false;
}

BinaryRow previous = this.partition;
partition =
table.newReadBuilder().newScan().listPartitions().stream()
.max(comparator)
.orElse(null);
List<BinaryRow> newPartitions = getMaxPartitions();
lastRefresh = LocalDateTime.now();

return !Objects.equals(previous, partition);
if (newPartitions.size() != partitions.size()) {
partitions = newPartitions;
return true;
} else {
for (int i = 0; i < newPartitions.size(); i++) {
if (comparator.compare(newPartitions.get(i), partitions.get(i)) != 0) {
partitions = newPartitions;
return true;
}
}
return false;
}
}

private List<BinaryRow> getMaxPartitions() {
List<BinaryRow> newPartitions =
table.newReadBuilder().newScan().listPartitions().stream()
.sorted(comparator.reversed())
.collect(Collectors.toList());

if (newPartitions.size() <= maxPartitionNum) {
return newPartitions;
} else {
return newPartitions.subList(0, maxPartitionNum);
}
}

@Nullable
Expand All @@ -101,13 +126,21 @@ public static DynamicPartitionLoader of(Table table) {
return null;
}

if (!dynamicPartition.equalsIgnoreCase(MAX_PT)) {
throw new UnsupportedOperationException(
"Unsupported dynamic partition pattern: " + dynamicPartition);
int maxPartitionNum;
switch (dynamicPartition.toLowerCase()) {
case MAX_PT:
maxPartitionNum = 1;
break;
case MAX_TWO_PT:
maxPartitionNum = 2;
break;
default:
throw new UnsupportedOperationException(
"Unsupported dynamic partition pattern: " + dynamicPartition);
}

Duration refresh =
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
return new DynamicPartitionLoader(table, refresh);
return new DynamicPartitionLoader(table, refresh, maxPartitionNum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
Expand Down Expand Up @@ -203,9 +204,9 @@ private void open() throws Exception {
if (partitionLoader != null) {
partitionLoader.open();
partitionLoader.checkRefresh();
BinaryRow partition = partitionLoader.partition();
if (partition != null) {
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
List<BinaryRow> partitions = partitionLoader.partitions();
if (!partitions.isEmpty()) {
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
}
}

Expand Down Expand Up @@ -236,17 +237,17 @@ public Collection<RowData> lookup(RowData keyRow) {
tryRefresh();

InternalRow key = new FlinkRowWrapper(keyRow);
if (partitionLoader != null) {
if (partitionLoader.partition() == null) {
return Collections.emptyList();
}
key = JoinedRow.join(key, partitionLoader.partition());
if (partitionLoader == null) {
return lookupInternal(key);
}

if (partitionLoader.partitions().isEmpty()) {
return Collections.emptyList();
}

List<InternalRow> results = lookupTable.get(key);
List<RowData> rows = new ArrayList<>(results.size());
for (InternalRow matchedRow : results) {
rows.add(new FlinkRowData(matchedRow));
List<RowData> rows = new ArrayList<>();
for (BinaryRow partition : partitionLoader.partitions()) {
rows.addAll(lookupInternal(JoinedRow.join(key, partition)));
}
return rows;
} catch (OutOfRangeException | ReopenException e) {
Expand All @@ -257,7 +258,28 @@ public Collection<RowData> lookup(RowData keyRow) {
}
}

private Predicate createSpecificPartFilter(BinaryRow partition) {
private List<RowData> lookupInternal(InternalRow key) throws IOException {
List<RowData> rows = new ArrayList<>();
List<InternalRow> lookupResults = lookupTable.get(key);
for (InternalRow matchedRow : lookupResults) {
rows.add(new FlinkRowData(matchedRow));
}
return rows;
}

private Predicate createSpecificPartFilter(List<BinaryRow> partitions) {
Predicate partFilter = null;
for (BinaryRow partition : partitions) {
if (partFilter == null) {
partFilter = createSinglePartFilter(partition);
} else {
partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition));
}
}
return partFilter;
}

private Predicate createSinglePartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
List<String> partitionKeys = table.partitionKeys();
Object[] partitionSpec =
Expand Down Expand Up @@ -291,15 +313,15 @@ void tryRefresh() throws Exception {
// 2. refresh dynamic partition
if (partitionLoader != null) {
boolean partitionChanged = partitionLoader.checkRefresh();
BinaryRow partition = partitionLoader.partition();
if (partition == null) {
List<BinaryRow> partitions = partitionLoader.partitions();
if (partitions.isEmpty()) {
// no data to be load, fast exit
return;
}

if (partitionChanged) {
// reopen with latest partition
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
lookupTable.close();
lookupTable.open();
// no need to refresh the lookup table because it is reopened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,4 +1006,43 @@ public void testOverwriteDimTable(boolean isPkTable) throws Exception {

iterator.close();
}

@ParameterizedTest
@EnumSource(LookupCacheMode.class)
public void testLookupMaxTwoPt0(LookupCacheMode mode) throws Exception {
sql(
"CREATE TABLE PARTITIONED_DIM (pt STRING, i INT, v INT)"
+ "PARTITIONED BY (`pt`) WITH ("
+ "'lookup.dynamic-partition' = 'max_two_pt()', "
+ "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ "'lookup.cache' = '%s', "
+ "'continuous.discovery-interval'='1 ms')",
mode);

String query =
"SELECT D.pt, T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

sql("INSERT INTO PARTITIONED_DIM VALUES ('2024-10-01', 1, 1), ('2024-10-01', 2, 2)");
Thread.sleep(500); // wait refresh
sql("INSERT INTO T VALUES (1)");
List<Row> result = iterator.collect(1);
assertThat(result).containsExactlyInAnyOrder(Row.of("2024-10-01", 1, 1));

sql("INSERT INTO PARTITIONED_DIM VALUES ('2024-10-02', 2, 2)");
Thread.sleep(500); // wait refresh
sql("INSERT INTO T VALUES (2)");
result = iterator.collect(2);
assertThat(result)
.containsExactlyInAnyOrder(Row.of("2024-10-01", 2, 2), Row.of("2024-10-02", 2, 2));

sql("ALTER TABLE PARTITIONED_DIM DROP PARTITION (pt = '2024-10-01')");
Thread.sleep(500); // wait refresh
sql("INSERT INTO T VALUES (1), (2)");
result = iterator.collect(2);
assertThat(result)
.containsExactlyInAnyOrder(Row.of(null, 1, null), Row.of("2024-10-02", 2, 2));

iterator.close();
}
}
Loading