Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.SingleFileWriter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -207,14 +208,18 @@ public boolean isCacheEnabled() {
}

public ManifestFile create() {
return create(null);
}

public ManifestFile create(List<Predicate> filters) {
RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA);
return new ManifestFile(
fileIO,
schemaManager,
partitionType,
new ManifestEntrySerializer(),
entryType,
fileFormat.createReaderFactory(entryType),
fileFormat.createReaderFactory(entryType, filters),
fileFormat.createWriterFactory(entryType),
compression,
pathFactory.manifestFileFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@
import org.apache.paimon.operation.metrics.ScanStats;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
Expand Down Expand Up @@ -79,6 +83,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {

private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private Collection<Integer> buckets;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
protected ScanMode scanMode = ScanMode.ALL;
private Filter<Integer> levelFilter = null;
Expand Down Expand Up @@ -127,6 +132,14 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) {
@Override
public FileStoreScan withBucket(int bucket) {
this.bucketFilter = i -> i == bucket;
this.buckets = Collections.singletonList(bucket);
return this;
}

@Override
public FileStoreScan withBuckets(Collection<Integer> buckets) {
this.bucketFilter = buckets::contains;
this.buckets = buckets;
return this;
}

Expand Down Expand Up @@ -427,7 +440,7 @@ private List<ManifestEntry> readManifest(
@Nullable Filter<ManifestEntry> additionalTFilter) {
List<ManifestEntry> entries =
manifestFileFactory
.create()
.create(createPushDownFilter(buckets))
.withCacheMetrics(
scanMetrics != null ? scanMetrics.getCacheMetrics() : null)
.read(
Expand Down Expand Up @@ -471,6 +484,22 @@ private Filter<InternalRow> createCacheRowFilter() {
return row -> manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row));
}

/**
* Read the corresponding entries based on the current required bucket, but push down into file
* format .
*/
private static List<Predicate> createPushDownFilter(Collection<Integer> buckets) {
if (buckets == null || buckets.isEmpty()) {
return null;
}
List<Predicate> predicates = new ArrayList<>();
PredicateBuilder predicateBuilder =
new PredicateBuilder(
RowType.of(new DataType[] {new IntType()}, new String[] {"_BUCKET"}));
predicates.add(predicateBuilder.in(0, new ArrayList<>(buckets)));
return predicates;
}

/**
* Read the corresponding entries based on the current required partition and bucket.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -57,6 +58,8 @@ public interface FileStoreScan {

FileStoreScan withBucket(int bucket);

FileStoreScan withBuckets(Collection<Integer> buckets);

FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);

FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer> bucketFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
Expand All @@ -43,6 +44,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -265,6 +267,13 @@ public Scan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public InnerTableScan withBuckets(Collection<Integer> buckets) {
mainScan.withBuckets(buckets);
fallbackScan.withBuckets(buckets);
return this;
}

@Override
public Scan withLevelFilter(Filter<Integer> levelFilter) {
mainScan.withLevelFilter(levelFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -79,6 +80,12 @@ public AbstractDataTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public AbstractDataTableScan withBuckets(Collection<Integer> buckets) {
snapshotReader.withBuckets(buckets);
return this;
}

@Override
public AbstractDataTableScan withPartitionFilter(Map<String, String> partitionSpec) {
snapshotReader.withPartitionFilter(partitionSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -47,6 +49,15 @@ default InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

default InnerTableScan withBucket(Integer bucket) {
return withBuckets(Collections.singletonList(bucket));
}

default InnerTableScan withBuckets(Collection<Integer> buckets) {
// return this is not safe for too many class not impl this method and withBucketFilter
return this;
}

default InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -83,6 +84,8 @@ public interface SnapshotReader {

SnapshotReader withBucket(int bucket);

SnapshotReader withBuckets(Collection<Integer> buckets);

SnapshotReader withBucketFilter(Filter<Integer> bucketFilter);

SnapshotReader withDataFileNameFilter(Filter<String> fileNameFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -253,6 +254,11 @@ public SnapshotReader withBucket(int bucket) {
return this;
}

public SnapshotReader withBuckets(Collection<Integer> buckets) {
scan.withBuckets(buckets);
return this;
}

@Override
public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
scan.withBucketFilter(bucketFilter);
Expand Down Expand Up @@ -285,7 +291,13 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt
Math.abs(file.hashCode() % numberOfParallelSubtasks)
== indexOfThisSubtask);
} else {
withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask);
Set<Integer> buckets = new HashSet<>();
for (int bucket = 0; bucket < this.tableSchema.numBuckets(); bucket++) {
if (bucket % numberOfParallelSubtasks == indexOfThisSubtask) {
buckets.add(bucket);
}
}
withBuckets(buckets);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -342,6 +343,12 @@ public SnapshotReader withBucket(int bucket) {
return this;
}

@Override
public SnapshotReader withBuckets(Collection<Integer> buckets) {
wrapped.withBuckets(buckets);
return this;
}

@Override
public SnapshotReader withBucketFilter(Filter<Integer> bucketFilter) {
wrapped.withBucketFilter(bucketFilter);
Expand Down Expand Up @@ -452,6 +459,12 @@ public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public InnerTableScan withBuckets(Collection<Integer> buckets) {
batchScan.withBuckets(buckets);
return this;
}

@Override
public InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
batchScan.withLevelFilter(levelFilter);
Expand Down Expand Up @@ -489,6 +502,18 @@ public StreamDataTableScan withFilter(Predicate predicate) {
return this;
}

@Override
public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
streamScan.withBucketFilter(bucketFilter);
return this;
}

@Override
public InnerTableScan withBuckets(Collection<Integer> buckets) {
streamScan.withBuckets(buckets);
return this;
}

@Override
public StartingContext startingContext() {
return streamScan.startingContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.junit.jupiter.api.io.TempDir;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -224,6 +225,30 @@ public void testWithBucket() throws Exception {
runTestExactMatch(scan, snapshot.id(), expected);
}

@Test
public void testWithBuckets() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
List<KeyValue> data = generateData(random.nextInt(1000) + 1);
Snapshot snapshot = writeData(data);

int wantedBucket1 = random.nextInt(NUM_BUCKETS);
int wantedBucket2 = random.nextInt(NUM_BUCKETS);
int wantedBucket3 = random.nextInt(NUM_BUCKETS);
Set<Integer> buckets =
new HashSet<>(Arrays.asList(wantedBucket1, wantedBucket2, wantedBucket3));

FileStoreScan scan = store.newScan();
scan.withSnapshot(snapshot.id());
scan.withBuckets(buckets);

Map<BinaryRow, BinaryRow> expected =
store.toKvMap(
data.stream()
.filter(kv -> buckets.contains(getBucket(kv)))
.collect(Collectors.toList()));
runTestExactMatch(scan, snapshot.id(), expected);
}

@Test
public void testWithSnapshot() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
Expand Down
8 changes: 8 additions & 0 deletions paimon-flink/paimon-flink-1.18/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-format</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
Expand Down
9 changes: 9 additions & 0 deletions paimon-flink/paimon-flink-1.19/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ under the License.
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-format</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
Expand Down
Loading