diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index 2330796a33b5..490f683c63bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -25,6 +25,12 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.InPredicateVisitor; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LeafPredicateExtractor; +import org.apache.paimon.predicate.Or; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; @@ -48,6 +54,8 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import javax.annotation.Nullable; + import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -67,11 +75,12 @@ public class BranchesTable implements ReadonlyTable { public static final String BRANCHES = "branches"; + private static final String BRANCH_NAME = "branch_name"; + public static final RowType TABLE_TYPE = new RowType( Arrays.asList( - new DataField( - 0, "branch_name", SerializationUtils.newStringType(false)), + new DataField(0, BRANCH_NAME, SerializationUtils.newStringType(false)), new DataField(1, "create_time", new TimestampType(false, 3)))); private final FileIO fileIO; @@ -97,7 +106,7 @@ public RowType rowType() { @Override public List primaryKeys() { - return Collections.singletonList("branch_name"); + return Collections.singletonList(BRANCH_NAME); } @Override @@ -121,16 +130,21 @@ public Table copy(Map dynamicOptions) { } private class BranchesScan extends ReadOnceTableScan { + private @Nullable Predicate branchPredicate; @Override public InnerTableScan withFilter(Predicate predicate) { - // TODO + if (predicate == null) { + return this; + } + branchPredicate = predicate; + return this; } @Override public Plan innerPlan() { - return () -> Collections.singletonList(new BranchesSplit(location)); + return () -> Collections.singletonList(new BranchesSplit(location, branchPredicate)); } } @@ -139,8 +153,11 @@ private static class BranchesSplit extends SingletonSplit { private final Path location; - private BranchesSplit(Path location) { + private final @Nullable Predicate branchPredicate; + + private BranchesSplit(Path location, @Nullable Predicate branchPredicate) { this.location = location; + this.branchPredicate = branchPredicate; } @Override @@ -152,7 +169,8 @@ public boolean equals(Object o) { return false; } BranchesSplit that = (BranchesSplit) o; - return Objects.equals(location, that.location); + return Objects.equals(location, that.location) + && Objects.equals(branchPredicate, that.branchPredicate); } @Override @@ -194,10 +212,11 @@ public RecordReader createReader(Split split) { } Path location = ((BranchesSplit) split).location; + Predicate predicate = ((BranchesSplit) split).branchPredicate; FileStoreTable table = FileStoreTableFactory.create(fileIO, location); Iterator rows; try { - rows = branches(table).iterator(); + rows = branches(table, predicate).iterator(); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -214,23 +233,66 @@ public RecordReader createReader(Split split) { return new IteratorRecordReader<>(rows); } - private List branches(FileStoreTable table) throws IOException { - BranchManager branchManager = table.branchManager(); + /** + * Creates an InternalRow for a branch with its creation time. + * + * @param branchName the name of the branch + * @param tablePath the table path + * @return InternalRow containing branch name and creation time + * @throws IOException if unable to get file status + */ + private InternalRow createBranchRow(String branchName, Path tablePath) throws IOException { + String branchPath = BranchManager.branchPath(tablePath, branchName); + long creationTime = fileIO.getFileStatus(new Path(branchPath)).getModificationTime(); + return GenericRow.of( + BinaryString.fromString(branchName), + Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(creationTime))); + } - List result = new ArrayList<>(); - List branches = branchManager.branches(); + private List branches(FileStoreTable table, Predicate predicate) + throws IOException { + BranchManager branchManager = table.branchManager(); Path tablePath = table.location(); - for (String branch : branches) { - String branchPath = BranchManager.branchPath(tablePath, branch); - long creationTime = - fileIO.getFileStatus(new Path(branchPath)).getModificationTime(); - result.add( - GenericRow.of( - BinaryString.fromString(branch), - Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(creationTime)))); + List result = new ArrayList<>(); + // Handle predicate filtering for branch_name + if (predicate != null) { + // Handle Equal predicate + if (predicate instanceof LeafPredicate + && ((LeafPredicate) predicate).function() instanceof Equal + && ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString + && predicate.visit(LeafPredicateExtractor.INSTANCE).get(BRANCH_NAME) + != null) { + String equalValue = ((LeafPredicate) predicate).literals().get(0).toString(); + if (branchManager.branchExists(equalValue)) { + result.add(createBranchRow(equalValue, tablePath)); + } + } + + // Handle CompoundPredicate (OR case for IN filter) + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + if ((compoundPredicate.function()) instanceof Or) { + List branchNames = new ArrayList<>(); + InPredicateVisitor.extractInElements(predicate, BRANCH_NAME) + .ifPresent( + e -> + e.stream() + .map(Object::toString) + .forEach(branchNames::add)); + for (String branchName : branchNames) { + if (branchManager.branchExists(branchName)) { + result.add(createBranchRow(branchName, tablePath)); + } + } + } + } + } else { + // Fallback to original logic if no predicate + List branches = branchManager.branches(); + for (String branch : branches) { + result.add(createBranchRow(branch, tablePath)); + } } - return result; } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index c593db273b79..9850845fefa8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -834,6 +834,41 @@ public void testMainAndFallbackNoPrimaryKeys() throws Exception { "+I[2, 220, 2200]"); } + @Test + public void testBranchesTableFilter() throws Exception { + sql("CREATE TABLE T (a INT, b INT)"); + sql("INSERT INTO T VALUES (1, 1)"); + sql("CALL sys.create_branch('default.T', 'b1')"); + sql("CALL sys.create_branch('default.T', 'b2')"); + sql("CALL sys.create_branch('default.T', 'b3')"); + + // no filter + assertThat(collectResult("SELECT branch_name FROM `T$branches`")) + .containsExactlyInAnyOrder("+I[b1]", "+I[b2]", "+I[b3]"); + + // equals + assertThat(collectResult("SELECT branch_name FROM `T$branches` WHERE branch_name = 'b2'")) + .containsExactlyInAnyOrder("+I[b2]"); + + // in + assertThat( + collectResult( + "SELECT branch_name FROM `T$branches` WHERE branch_name IN ('b1', 'b3')")) + .containsExactlyInAnyOrder("+I[b1]", "+I[b3]"); + + // in with non-existent branch + assertThat( + collectResult( + "SELECT branch_name FROM `T$branches` WHERE branch_name IN ('b1', 'non_existent')")) + .containsExactlyInAnyOrder("+I[b1]"); + + // equals with non-existent branch + assertThat( + collectResult( + "SELECT branch_name FROM `T$branches` WHERE branch_name = 'non_existent'")) + .isEmpty(); + } + private List collectResult(String sql) throws Exception { List result = new ArrayList<>(); try (CloseableIterator it = tEnv.executeSql(sql).collect()) {