Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.InPredicateVisitor;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -48,6 +54,8 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
Expand All @@ -67,11 +75,12 @@ public class BranchesTable implements ReadonlyTable {

public static final String BRANCHES = "branches";

private static final String BRANCH_NAME = "branch_name";

public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new DataField(
0, "branch_name", SerializationUtils.newStringType(false)),
new DataField(0, BRANCH_NAME, SerializationUtils.newStringType(false)),
new DataField(1, "create_time", new TimestampType(false, 3))));

private final FileIO fileIO;
Expand All @@ -97,7 +106,7 @@ public RowType rowType() {

@Override
public List<String> primaryKeys() {
return Collections.singletonList("branch_name");
return Collections.singletonList(BRANCH_NAME);
}

@Override
Expand All @@ -121,16 +130,21 @@ public Table copy(Map<String, String> dynamicOptions) {
}

private class BranchesScan extends ReadOnceTableScan {
private @Nullable Predicate branchPredicate;

@Override
public InnerTableScan withFilter(Predicate predicate) {
// TODO
if (predicate == null) {
return this;
}
branchPredicate = predicate;

return this;
}

@Override
public Plan innerPlan() {
return () -> Collections.singletonList(new BranchesSplit(location));
return () -> Collections.singletonList(new BranchesSplit(location, branchPredicate));
}
}

Expand All @@ -139,8 +153,11 @@ private static class BranchesSplit extends SingletonSplit {

private final Path location;

private BranchesSplit(Path location) {
private final @Nullable Predicate branchPredicate;

private BranchesSplit(Path location, @Nullable Predicate branchPredicate) {
this.location = location;
this.branchPredicate = branchPredicate;
}

@Override
Expand All @@ -152,7 +169,8 @@ public boolean equals(Object o) {
return false;
}
BranchesSplit that = (BranchesSplit) o;
return Objects.equals(location, that.location);
return Objects.equals(location, that.location)
&& Objects.equals(branchPredicate, that.branchPredicate);
}

@Override
Expand Down Expand Up @@ -194,10 +212,11 @@ public RecordReader<InternalRow> createReader(Split split) {
}

Path location = ((BranchesSplit) split).location;
Predicate predicate = ((BranchesSplit) split).branchPredicate;
FileStoreTable table = FileStoreTableFactory.create(fileIO, location);
Iterator<InternalRow> rows;
try {
rows = branches(table).iterator();
rows = branches(table, predicate).iterator();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand All @@ -214,23 +233,66 @@ public RecordReader<InternalRow> createReader(Split split) {
return new IteratorRecordReader<>(rows);
}

private List<InternalRow> branches(FileStoreTable table) throws IOException {
BranchManager branchManager = table.branchManager();
/**
* Creates an InternalRow for a branch with its creation time.
*
* @param branchName the name of the branch
* @param tablePath the table path
* @return InternalRow containing branch name and creation time
* @throws IOException if unable to get file status
*/
private InternalRow createBranchRow(String branchName, Path tablePath) throws IOException {
String branchPath = BranchManager.branchPath(tablePath, branchName);
long creationTime = fileIO.getFileStatus(new Path(branchPath)).getModificationTime();
return GenericRow.of(
BinaryString.fromString(branchName),
Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(creationTime)));
}

List<InternalRow> result = new ArrayList<>();
List<String> branches = branchManager.branches();
private List<InternalRow> branches(FileStoreTable table, Predicate predicate)
throws IOException {
BranchManager branchManager = table.branchManager();
Path tablePath = table.location();
for (String branch : branches) {
String branchPath = BranchManager.branchPath(tablePath, branch);
long creationTime =
fileIO.getFileStatus(new Path(branchPath)).getModificationTime();
result.add(
GenericRow.of(
BinaryString.fromString(branch),
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(creationTime))));
List<InternalRow> result = new ArrayList<>();
// Handle predicate filtering for branch_name
if (predicate != null) {
// Handle Equal predicate
if (predicate instanceof LeafPredicate
&& ((LeafPredicate) predicate).function() instanceof Equal
&& ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString
&& predicate.visit(LeafPredicateExtractor.INSTANCE).get(BRANCH_NAME)
!= null) {
String equalValue = ((LeafPredicate) predicate).literals().get(0).toString();
if (branchManager.branchExists(equalValue)) {
result.add(createBranchRow(equalValue, tablePath));
}
}

// Handle CompoundPredicate (OR case for IN filter)
if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
if ((compoundPredicate.function()) instanceof Or) {
List<String> branchNames = new ArrayList<>();
InPredicateVisitor.extractInElements(predicate, BRANCH_NAME)
.ifPresent(
e ->
e.stream()
.map(Object::toString)
.forEach(branchNames::add));
for (String branchName : branchNames) {
if (branchManager.branchExists(branchName)) {
result.add(createBranchRow(branchName, tablePath));
}
}
}
}
} else {
// Fallback to original logic if no predicate
List<String> branches = branchManager.branches();
for (String branch : branches) {
result.add(createBranchRow(branch, tablePath));
}
}

return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,41 @@ public void testMainAndFallbackNoPrimaryKeys() throws Exception {
"+I[2, 220, 2200]");
}

@Test
public void testBranchesTableFilter() throws Exception {
sql("CREATE TABLE T (a INT, b INT)");
sql("INSERT INTO T VALUES (1, 1)");
sql("CALL sys.create_branch('default.T', 'b1')");
sql("CALL sys.create_branch('default.T', 'b2')");
sql("CALL sys.create_branch('default.T', 'b3')");

// no filter
assertThat(collectResult("SELECT branch_name FROM `T$branches`"))
.containsExactlyInAnyOrder("+I[b1]", "+I[b2]", "+I[b3]");

// equals
assertThat(collectResult("SELECT branch_name FROM `T$branches` WHERE branch_name = 'b2'"))
.containsExactlyInAnyOrder("+I[b2]");

// in
assertThat(
collectResult(
"SELECT branch_name FROM `T$branches` WHERE branch_name IN ('b1', 'b3')"))
.containsExactlyInAnyOrder("+I[b1]", "+I[b3]");

// in with non-existent branch
assertThat(
collectResult(
"SELECT branch_name FROM `T$branches` WHERE branch_name IN ('b1', 'non_existent')"))
.containsExactlyInAnyOrder("+I[b1]");

// equals with non-existent branch
assertThat(
collectResult(
"SELECT branch_name FROM `T$branches` WHERE branch_name = 'non_existent'"))
.isEmpty();
}

private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
Expand Down
Loading