From b087973bcd65547a613e3093cc0bf2ded0b46ffc Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Wed, 10 Sep 2025 15:03:13 +0800 Subject: [PATCH 1/2] [core] Support push down branchesTable by branchName --- .../paimon/table/system/BranchesTable.java | 106 ++++++++++++++---- .../apache/paimon/flink/BranchSqlITCase.java | 38 ++++++- 2 files changed, 120 insertions(+), 24 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index 2330796a33b5..490f683c63bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -25,6 +25,12 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.InPredicateVisitor; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LeafPredicateExtractor; +import org.apache.paimon.predicate.Or; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; @@ -48,6 +54,8 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -67,11 +75,12 @@ public class BranchesTable implements ReadonlyTable { public static final String BRANCHES = "branches"; + private static final String BRANCH_NAME = "branch_name"; + public static final RowType TABLE_TYPE = new RowType( Arrays.asList( - new DataField( - 0, "branch_name", SerializationUtils.newStringType(false)), + new DataField(0, BRANCH_NAME, SerializationUtils.newStringType(false)), new DataField(1, "create_time", new TimestampType(false, 3)))); private final FileIO fileIO; @@ -97,7 +106,7 @@ public RowType rowType() { @Override public List primaryKeys() { - return Collections.singletonList("branch_name"); + return Collections.singletonList(BRANCH_NAME); } @Override @@ -121,16 +130,21 @@ public Table copy(Map dynamicOptions) { } private class BranchesScan extends ReadOnceTableScan { + private @Nullable Predicate branchPredicate; @Override public InnerTableScan withFilter(Predicate predicate) { - // TODO + if (predicate == null) { + return this; + } + branchPredicate = predicate; + return this; } @Override public Plan innerPlan() { - return () -> Collections.singletonList(new BranchesSplit(location)); + return () -> Collections.singletonList(new BranchesSplit(location, branchPredicate)); } } @@ -139,8 +153,11 @@ private static class BranchesSplit extends SingletonSplit { private final Path location; - private BranchesSplit(Path location) { + private final @Nullable Predicate branchPredicate; + + private BranchesSplit(Path location, @Nullable Predicate branchPredicate) { this.location = location; + this.branchPredicate = branchPredicate; } @Override @@ -152,7 +169,8 @@ public boolean equals(Object o) { return false; } BranchesSplit that = (BranchesSplit) o; - return Objects.equals(location, that.location); + return Objects.equals(location, that.location) + && Objects.equals(branchPredicate, that.branchPredicate); } @Override @@ -194,10 +212,11 @@ public RecordReader createReader(Split split) { } Path location = ((BranchesSplit) split).location; + Predicate predicate = ((BranchesSplit) split).branchPredicate; FileStoreTable table = FileStoreTableFactory.create(fileIO, location); Iterator rows; try { - rows = branches(table).iterator(); + rows = branches(table, predicate).iterator(); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -214,23 +233,66 @@ public RecordReader createReader(Split split) { return new IteratorRecordReader<>(rows); } - private List branches(FileStoreTable table) throws IOException { - BranchManager branchManager = table.branchManager(); + /** + * Creates an InternalRow for a branch with its creation time. + * + * @param branchName the name of the branch + * @param tablePath the table path + * @return InternalRow containing branch name and creation time + * @throws IOException if unable to get file status + */ + private InternalRow createBranchRow(String branchName, Path tablePath) throws IOException { + String branchPath = BranchManager.branchPath(tablePath, branchName); + long creationTime = fileIO.getFileStatus(new Path(branchPath)).getModificationTime(); + return GenericRow.of( + BinaryString.fromString(branchName), + Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(creationTime))); + } - List result = new ArrayList<>(); - List branches = branchManager.branches(); + private List branches(FileStoreTable table, Predicate predicate) + throws IOException { + BranchManager branchManager = table.branchManager(); Path tablePath = table.location(); - for (String branch : branches) { - String branchPath = BranchManager.branchPath(tablePath, branch); - long creationTime = - fileIO.getFileStatus(new Path(branchPath)).getModificationTime(); - result.add( - GenericRow.of( - BinaryString.fromString(branch), - Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(creationTime)))); + List result = new ArrayList<>(); + // Handle predicate filtering for branch_name + if (predicate != null) { + // Handle Equal predicate + if (predicate instanceof LeafPredicate + && ((LeafPredicate) predicate).function() instanceof Equal + && ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString + && predicate.visit(LeafPredicateExtractor.INSTANCE).get(BRANCH_NAME) + != null) { + String equalValue = ((LeafPredicate) predicate).literals().get(0).toString(); + if (branchManager.branchExists(equalValue)) { + result.add(createBranchRow(equalValue, tablePath)); + } + } + + // Handle CompoundPredicate (OR case for IN filter) + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + if ((compoundPredicate.function()) instanceof Or) { + List branchNames = new ArrayList<>(); + InPredicateVisitor.extractInElements(predicate, BRANCH_NAME) + .ifPresent( + e -> + e.stream() + .map(Object::toString) + .forEach(branchNames::add)); + for (String branchName : branchNames) { + if (branchManager.branchExists(branchName)) { + result.add(createBranchRow(branchName, tablePath)); + } + } + } + } + } else { + // Fallback to original logic if no predicate + List branches = branchManager.branches(); + for (String branch : branches) { + result.add(createBranchRow(branch, tablePath)); + } } - return result; } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index c593db273b79..2d5fd8cb2a5a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -23,14 +23,13 @@ import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.SnapshotManager; -import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils; - import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.assertj.core.api.AssertionsForInterfaceTypes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils; import java.io.IOException; import java.util.ArrayList; @@ -834,6 +833,41 @@ public void testMainAndFallbackNoPrimaryKeys() throws Exception { "+I[2, 220, 2200]"); } + @Test + public void testBranchesTableFilter() throws Exception { + sql("CREATE TABLE T (a INT, b INT)"); + sql("INSERT INTO T VALUES (1, 1)"); + sql("CALL sys.create_branch('default.T', 'b1')"); + sql("CALL sys.create_branch('default.T', 'b2')"); + sql("CALL sys.create_branch('default.T', 'b3')"); + + // no filter + assertThat(collectResult("SELECT branch_name FROM `T$branches`")) + .containsExactlyInAnyOrder("+I[b1]", "+I[b2]", "+I[b3]"); + + // equals + assertThat(collectResult("SELECT branch_name FROM `T$branches` WHERE branch_name = 'b2'")) + .containsExactlyInAnyOrder("+I[b2]"); + + // in + assertThat( + collectResult( + "SELECT branch_name FROM `T$branches` WHERE branch_name IN ('b1', 'b3')")) + .containsExactlyInAnyOrder("+I[b1]", "+I[b3]"); + + // in with non-existent branch + assertThat( + collectResult( + "SELECT branch_name FROM `T$branches` WHERE branch_name IN ('b1', 'non_existent')")) + .containsExactlyInAnyOrder("+I[b1]"); + + // equals with non-existent branch + assertThat( + collectResult( + "SELECT branch_name FROM `T$branches` WHERE branch_name = 'non_existent'")) + .isEmpty(); + } + private List collectResult(String sql) throws Exception { List result = new ArrayList<>(); try (CloseableIterator it = tEnv.executeSql(sql).collect()) { From 7641dcb2c6fbde7da73f933b86c59b25c6388ad0 Mon Sep 17 00:00:00 2001 From: huyuanfeng Date: Wed, 10 Sep 2025 17:54:59 +0800 Subject: [PATCH 2/2] [core] Support push down branchesTable by branchName --- .../src/test/java/org/apache/paimon/flink/BranchSqlITCase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 2d5fd8cb2a5a..9850845fefa8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -23,13 +23,14 @@ import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils; + import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.assertj.core.api.AssertionsForInterfaceTypes; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils; import java.io.IOException; import java.util.ArrayList;