diff --git a/docs/content/maintenance/metrics.md b/docs/content/maintenance/metrics.md
index 139bdfff6bfe..2c3067267fd7 100644
--- a/docs/content/maintenance/metrics.md
+++ b/docs/content/maintenance/metrics.md
@@ -67,16 +67,6 @@ Below is lists of Paimon built-in metrics. They are summarized into types of sca
Gauge |
Number of scanned manifest files in the last scan. |
-
- | lastSkippedByPartitionAndStats |
- Gauge |
- Skipped table files by partition filter and value / key stats information in the last scan. |
-
-
- | lastSkippedByWholeBucketFilesFilter |
- Gauge |
- Skipped table files by bucket level value filter (only primary key table) in the last scan. |
-
| lastScanSkippedTableFiles |
Gauge |
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
index 2764bc77363a..4d9416252e0f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
@@ -37,6 +37,13 @@ public interface Filter {
*/
boolean test(T t);
+ default Filter and(Filter super T> other) {
+ if (other == null) {
+ return this;
+ }
+ return t -> test(t) && other.test(t);
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
static Filter alwaysTrue() {
return (Filter) ALWAYS_TRUE;
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
index 02b5d73fcf2c..112b9ad1cda2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -110,7 +110,9 @@ private void advanceIfNeeded() {
if (stack.isEmpty()) {
return;
}
- activeList = randomlyExecute(executor, processor, stack.poll());
+ activeList =
+ randomlyExecuteSequentialReturn(
+ executor, processor, stack.poll());
}
}
}
@@ -132,7 +134,7 @@ public static void randomlyOnlyExecute(
awaitAllFutures(futures);
}
- public static Iterator randomlyExecute(
+ public static Iterator randomlyExecuteSequentialReturn(
ExecutorService executor, Function> processor, Collection input) {
List>> futures = new ArrayList<>(input.size());
ClassLoader cl = Thread.currentThread().getContextClassLoader();
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index 842b223167b2..577f28d0f5cf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -27,6 +27,7 @@
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
@@ -441,7 +442,12 @@ public ManifestEntry next() {
}
if (currentIterator.hasNext()) {
- return currentIterator.next();
+ ManifestEntry entry = currentIterator.next();
+ if (entry.kind() == FileKind.DELETE) {
+ continue;
+ } else {
+ return entry;
+ }
}
currentIterator = null;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 3b3e514e0a1e..91e07a369da2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -28,15 +28,17 @@
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
/** Entry representing a file. */
@@ -214,7 +216,11 @@ static Set readDeletedEntries(
return readDeletedEntries(
m ->
manifestFile.read(
- m.fileName(), m.fileSize(), Filter.alwaysTrue(), deletedFilter()),
+ m.fileName(),
+ m.fileSize(),
+ Filter.alwaysTrue(),
+ deletedFilter(),
+ Filter.alwaysTrue()),
manifestFiles,
manifestReadParallelism);
}
@@ -234,11 +240,11 @@ static Set readDeletedEntries(
.filter(e -> e.kind() == FileKind.DELETE)
.map(FileEntry::identifier)
.collect(Collectors.toList());
- Iterable identifiers =
- sequentialBatchedExecute(processor, manifestFiles, manifestReadParallelism);
- Set result = new HashSet<>();
- for (Identifier identifier : identifiers) {
- result.add(identifier);
+ Iterator identifiers =
+ randomlyExecuteSequentialReturn(processor, manifestFiles, manifestReadParallelism);
+ Set result = ConcurrentHashMap.newKeySet();
+ while (identifiers.hasNext()) {
+ result.add(identifiers.next());
}
return result;
}
@@ -247,4 +253,9 @@ static Filter deletedFilter() {
Function getter = ManifestEntrySerializer.kindGetter();
return row -> getter.apply(row) == FileKind.DELETE;
}
+
+ static Filter addFilter() {
+ Function getter = ManifestEntrySerializer.kindGetter();
+ return row -> getter.apply(row) == FileKind.ADD;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
new file mode 100644
index 000000000000..29ae6f6389c6
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FilteredManifestEntry.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+/** Wrap a {@link ManifestEntry} to contain {@link #selected}. */
+public class FilteredManifestEntry extends ManifestEntry {
+
+ private final boolean selected;
+
+ public FilteredManifestEntry(ManifestEntry entry, boolean selected) {
+ super(entry.kind(), entry.partition(), entry.bucket(), entry.totalBuckets(), entry.file());
+ this.selected = selected;
+ }
+
+ public boolean selected() {
+ return selected;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 38181a8234aa..128f5262a553 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -211,18 +211,5 @@ public ManifestFile create() {
suggestedFileSize,
cache);
}
-
- public ObjectsFile createSimpleFileEntryReader() {
- RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA);
- return new ObjectsFile<>(
- fileIO,
- new SimpleFileEntrySerializer(),
- entryType,
- fileFormat.createReaderFactory(entryType),
- fileFormat.createWriterFactory(entryType),
- compression,
- pathFactory.manifestFileFactory(),
- cache);
- }
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 0e1f9357e312..98e064451509 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -23,7 +23,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.BucketEntry;
import org.apache.paimon.manifest.FileEntry;
-import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileEntry.Identifier;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
@@ -43,8 +43,6 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
-
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -62,6 +60,7 @@
import java.util.stream.Collectors;
import static org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
+import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -193,6 +192,11 @@ public FileStoreScan withLevelFilter(Filter levelFilter) {
return this;
}
+ @Override
+ public FileStoreScan enableValueFilter() {
+ return this;
+ }
+
@Override
public FileStoreScan withManifestEntryFilter(Filter filter) {
this.manifestEntryFilter = filter;
@@ -241,47 +245,46 @@ public Plan plan() {
Snapshot snapshot = manifestsResult.snapshot;
List manifests = manifestsResult.filteredManifests;
- long startDataFiles =
- manifestsResult.allManifests.stream()
- .mapToLong(f -> f.numAddedFiles() - f.numDeletedFiles())
- .sum();
-
- Collection mergedEntries =
- readAndMergeFileEntries(manifests, this::readManifest);
-
- long skippedByPartitionAndStats = startDataFiles - mergedEntries.size();
-
- // We group files by bucket here, and filter them by the whole bucket filter.
- // Why do this: because in primary key table, we can't just filter the value
- // by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
- // but we can do this by filter the whole bucket files
- List files =
- mergedEntries.stream()
- .collect(
- Collectors.groupingBy(
- // we use LinkedHashMap to avoid disorder
- file -> Pair.of(file.partition(), file.bucket()),
- LinkedHashMap::new,
- Collectors.toList()))
- .values()
- .stream()
- .map(this::filterWholeBucketByStats)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ Iterator iterator = readManifestEntries(manifests, false);
+ List files = new ArrayList<>();
+ while (iterator.hasNext()) {
+ files.add(iterator.next());
+ }
+
+ if (wholeBucketFilterEnabled()) {
+ // We group files by bucket here, and filter them by the whole bucket filter.
+ // Why do this: because in primary key table, we can't just filter the value
+ // by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
+ // but we can do this by filter the whole bucket files
+ files =
+ files.stream()
+ .collect(
+ Collectors.groupingBy(
+ // we use LinkedHashMap to avoid disorder
+ file -> Pair.of(file.partition(), file.bucket()),
+ LinkedHashMap::new,
+ Collectors.toList()))
+ .values()
+ .stream()
+ .map(this::filterWholeBucketByStats)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
+ List result = files;
- long skippedByWholeBucketFiles = mergedEntries.size() - files.size();
long scanDuration = (System.nanoTime() - started) / 1_000_000;
- checkState(
- startDataFiles - skippedByPartitionAndStats - skippedByWholeBucketFiles
- == files.size());
if (scanMetrics != null) {
+ long allDataFiles =
+ manifestsResult.allManifests.stream()
+ .mapToLong(f -> f.numAddedFiles() - f.numDeletedFiles())
+ .sum();
scanMetrics.reportScan(
new ScanStats(
scanDuration,
manifests.size(),
- skippedByPartitionAndStats,
- skippedByWholeBucketFiles,
- files.size()));
+ allDataFiles - result.size(),
+ result.size()));
}
return new Plan() {
@@ -299,12 +302,7 @@ public Snapshot snapshot() {
@Override
public List files() {
- if (dropStats) {
- return files.stream()
- .map(ManifestEntry::copyWithoutStats)
- .collect(Collectors.toList());
- }
- return files;
+ return result;
}
};
}
@@ -312,9 +310,15 @@ public List files() {
@Override
public List readSimpleEntries() {
List manifests = readManifests().filteredManifests;
- Collection mergedEntries =
- readAndMergeFileEntries(manifests, this::readSimpleEntries);
- return new ArrayList<>(mergedEntries);
+ Iterator iterator =
+ scanMode == ScanMode.ALL
+ ? readAndMergeFileEntries(manifests, SimpleFileEntry::from, false)
+ : readAndNoMergeFileEntries(manifests, SimpleFileEntry::from, false);
+ List result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ return result;
}
@Override
@@ -343,23 +347,57 @@ public List readBucketEntries() {
@Override
public Iterator readFileIterator() {
- List manifests = readManifests().filteredManifests;
- Set deleteEntries =
- FileEntry.readDeletedEntries(this::readSimpleEntries, manifests, parallelism);
- Iterator iterator =
- sequentialBatchedExecute(this::readManifest, manifests, parallelism).iterator();
- return Iterators.filter(
- iterator,
- entry ->
- entry != null
- && entry.kind() == FileKind.ADD
- && !deleteEntries.contains(entry.identifier()));
+ // useSequential: reduce memory and iterator can be stopping
+ return readManifestEntries(readManifests().filteredManifests, true);
+ }
+
+ private Iterator readManifestEntries(
+ List manifests, boolean useSequential) {
+ return scanMode == ScanMode.ALL
+ ? readAndMergeFileEntries(manifests, Function.identity(), useSequential)
+ : readAndNoMergeFileEntries(manifests, Function.identity(), useSequential);
+ }
+
+ private Iterator readAndMergeFileEntries(
+ List manifests,
+ Function, List> converter,
+ boolean useSequential) {
+ Set deletedEntries =
+ FileEntry.readDeletedEntries(
+ manifest -> readManifest(manifest, FileEntry.deletedFilter(), null),
+ manifests,
+ parallelism);
+
+ manifests =
+ manifests.stream()
+ .filter(file -> file.numAddedFiles() > 0)
+ .collect(Collectors.toList());
+
+ Function> processor =
+ manifest ->
+ converter.apply(
+ readManifest(
+ manifest,
+ FileEntry.addFilter(),
+ entry -> !deletedEntries.contains(entry.identifier())));
+ if (useSequential) {
+ return sequentialBatchedExecute(processor, manifests, parallelism).iterator();
+ } else {
+ return randomlyExecuteSequentialReturn(processor, manifests, parallelism);
+ }
}
- public Collection readAndMergeFileEntries(
- List manifests, Function> manifestReader) {
- return FileEntry.mergeEntries(
- sequentialBatchedExecute(manifestReader, manifests, parallelism));
+ private Iterator readAndNoMergeFileEntries(
+ List manifests,
+ Function, List> converter,
+ boolean useSequential) {
+ Function> reader =
+ manifest -> converter.apply(readManifest(manifest));
+ if (useSequential) {
+ return sequentialBatchedExecute(reader, manifests, parallelism).iterator();
+ } else {
+ return randomlyExecuteSequentialReturn(reader, manifests, parallelism);
+ }
}
private ManifestsReader.Result readManifests() {
@@ -384,12 +422,24 @@ protected TableSchema scanTableSchema(long id) {
/** Note: Keep this thread-safe. */
protected abstract boolean filterByStats(ManifestEntry entry);
- /** Note: Keep this thread-safe. */
- protected abstract List filterWholeBucketByStats(List entries);
+ protected boolean wholeBucketFilterEnabled() {
+ return false;
+ }
+
+ protected List filterWholeBucketByStats(List entries) {
+ return entries;
+ }
/** Note: Keep this thread-safe. */
@Override
public List readManifest(ManifestFileMeta manifest) {
+ return readManifest(manifest, null, null);
+ }
+
+ private List readManifest(
+ ManifestFileMeta manifest,
+ @Nullable Filter additionalFilter,
+ @Nullable Filter additionalTFilter) {
List entries =
manifestFileFactory
.create()
@@ -397,29 +447,24 @@ public List readManifest(ManifestFileMeta manifest) {
manifest.fileName(),
manifest.fileSize(),
createCacheRowFilter(),
- createEntryRowFilter());
- List filteredEntries = new ArrayList<>(entries.size());
- for (ManifestEntry entry : entries) {
- if ((manifestEntryFilter == null || manifestEntryFilter.test(entry))
- && filterByStats(entry)) {
- filteredEntries.add(entry);
+ createEntryRowFilter().and(additionalFilter),
+ entry ->
+ (additionalTFilter == null || additionalTFilter.test(entry))
+ && (manifestEntryFilter == null
+ || manifestEntryFilter.test(entry))
+ && filterByStats(entry));
+ if (dropStats) {
+ List copied = new ArrayList<>(entries.size());
+ for (ManifestEntry entry : entries) {
+ copied.add(dropStats(entry));
}
+ entries = copied;
}
- return filteredEntries;
+ return entries;
}
- /** Note: Keep this thread-safe. */
- private List readSimpleEntries(ManifestFileMeta manifest) {
- return manifestFileFactory
- .createSimpleFileEntryReader()
- .read(
- manifest.fileName(),
- manifest.fileSize(),
- // use filter for ManifestEntry
- // currently, projection is not pushed down to file format
- // see SimpleFileEntrySerializer
- createCacheRowFilter(),
- createEntryRowFilter());
+ protected ManifestEntry dropStats(ManifestEntry entry) {
+ return entry.copyWithoutStats();
}
/**
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 60b4e7933cb1..d2ca5da42249 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -34,7 +34,6 @@
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */
@@ -100,12 +99,6 @@ protected boolean filterByStats(ManifestEntry entry) {
&& (!fileIndexReadEnabled || testFileIndex(entry.file().embeddedIndex(), entry));
}
- @Override
- protected List filterWholeBucketByStats(List entries) {
- // We don't need to filter per-bucket entries here
- return entries;
- }
-
private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry) {
if (embeddedIndexBytes == null) {
return true;
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index e643bf1617b4..7663f48229c6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -73,6 +73,8 @@ public interface FileStoreScan {
FileStoreScan withLevelFilter(Filter levelFilter);
+ FileStoreScan enableValueFilter();
+
FileStoreScan withManifestEntryFilter(Filter filter);
FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index c368d9e510b0..8d8c51996cfe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -23,6 +23,7 @@
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.fileindex.FileIndexPredicate;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FilteredManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.predicate.Predicate;
@@ -45,7 +46,6 @@
import java.util.Map;
import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
-import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
/** {@link FileStoreScan} for {@link KeyValueFileStore}. */
@@ -64,6 +64,8 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {
private final boolean fileIndexReadEnabled;
private final Map schemaId2DataFilter = new HashMap<>();
+ private boolean valueFilterForceEnabled = false;
+
public KeyValueFileStoreScan(
ManifestsReader manifestsReader,
BucketSelectConverter bucketSelectConverter,
@@ -110,11 +112,17 @@ public KeyValueFileStoreScan withValueFilter(Predicate predicate) {
return this;
}
+ @Override
+ public FileStoreScan enableValueFilter() {
+ this.valueFilterForceEnabled = true;
+ return this;
+ }
+
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
DataFileMeta file = entry.file();
- if (isValueFilterEnabled(entry) && !filterByValueFilter(entry)) {
+ if (isValueFilterEnabled() && !filterByValueFilter(entry)) {
return false;
}
@@ -130,6 +138,14 @@ protected boolean filterByStats(ManifestEntry entry) {
return true;
}
+ @Override
+ protected ManifestEntry dropStats(ManifestEntry entry) {
+ if (!isValueFilterEnabled() && wholeBucketFilterEnabled()) {
+ return new FilteredManifestEntry(entry.copyWithoutStats(), filterByValueFilter(entry));
+ }
+ return entry.copyWithoutStats();
+ }
+
private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry) {
if (embeddedIndexBytes == null) {
return true;
@@ -150,14 +166,14 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE
}
}
- private boolean isValueFilterEnabled(ManifestEntry entry) {
+ private boolean isValueFilterEnabled() {
if (valueFilter == null) {
return false;
}
switch (scanMode) {
case ALL:
- return (deletionVectorsEnabled || mergeEngine == FIRST_ROW) && entry.level() > 0;
+ return valueFilterForceEnabled;
case DELTA:
return false;
case CHANGELOG:
@@ -168,13 +184,13 @@ private boolean isValueFilterEnabled(ManifestEntry entry) {
}
}
- /** Note: Keep this thread-safe. */
@Override
- protected List filterWholeBucketByStats(List entries) {
- if (valueFilter == null || scanMode != ScanMode.ALL) {
- return entries;
- }
+ protected boolean wholeBucketFilterEnabled() {
+ return valueFilter != null && scanMode == ScanMode.ALL;
+ }
+ @Override
+ protected List filterWholeBucketByStats(List entries) {
return noOverlapping(entries)
? filterWholeBucketPerFile(entries)
: filterWholeBucketAllFiles(entries);
@@ -207,6 +223,10 @@ private List filterWholeBucketAllFiles(List entrie
}
private boolean filterByValueFilter(ManifestEntry entry) {
+ if (entry instanceof FilteredManifestEntry) {
+ return ((FilteredManifestEntry) entry).selected();
+ }
+
DataFileMeta file = entry.file();
SimpleStatsEvolution.Result result =
fieldValueStatsConverters
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 3ee108c10359..a5eea6d650cf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -53,7 +53,7 @@
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
-import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
+import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
/**
@@ -180,7 +180,7 @@ private Map getCandidateDeletingFiles() {
.filter(this::oldEnough)
.map(FileStatus::getPath)
.collect(Collectors.toList());
- Iterator allPaths = randomlyExecute(executor, processor, fileDirs);
+ Iterator allPaths = randomlyExecuteSequentialReturn(executor, processor, fileDirs);
Map result = new HashMap<>();
while (allPaths.hasNext()) {
Path next = allPaths.next();
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
index 9fcbb8960fc5..96f0aec1c0b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
@@ -49,12 +49,6 @@ public MetricGroup getMetricGroup() {
public static final String SCAN_DURATION = "scanDuration";
public static final String LAST_SCANNED_MANIFESTS = "lastScannedManifests";
- public static final String LAST_SKIPPED_BY_PARTITION_AND_STATS =
- "lastSkippedByPartitionAndStats";
-
- public static final String LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER =
- "lastSkippedByWholeBucketFilesFilter";
-
public static final String LAST_SCAN_SKIPPED_TABLE_FILES = "lastScanSkippedTableFiles";
public static final String LAST_SCAN_RESULTED_TABLE_FILES = "lastScanResultedTableFiles";
@@ -66,12 +60,6 @@ private void registerGenericScanMetrics() {
metricGroup.gauge(
LAST_SCANNED_MANIFESTS,
() -> latestScan == null ? 0L : latestScan.getScannedManifests());
- metricGroup.gauge(
- LAST_SKIPPED_BY_PARTITION_AND_STATS,
- () -> latestScan == null ? 0L : latestScan.getSkippedByPartitionAndStats());
- metricGroup.gauge(
- LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER,
- () -> latestScan == null ? 0L : latestScan.getSkippedByWholeBucketFiles());
metricGroup.gauge(
LAST_SCAN_SKIPPED_TABLE_FILES,
() -> latestScan == null ? 0L : latestScan.getSkippedTableFiles());
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
index e760282e687a..700619c3680f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java
@@ -25,23 +25,15 @@ public class ScanStats {
// the unit is milliseconds
private final long duration;
private final long scannedManifests;
- private final long skippedByPartitionAndStats;
- private final long skippedByWholeBucketFiles;
private final long skippedTableFiles;
private final long resultedTableFiles;
public ScanStats(
- long duration,
- long scannedManifests,
- long skippedByPartitionAndStats,
- long skippedByWholeBucketFiles,
- long resultedTableFiles) {
+ long duration, long scannedManifests, long skippedTableFiles, long resultedTableFiles) {
this.duration = duration;
this.scannedManifests = scannedManifests;
- this.skippedByPartitionAndStats = skippedByPartitionAndStats;
- this.skippedByWholeBucketFiles = skippedByWholeBucketFiles;
- this.skippedTableFiles = skippedByPartitionAndStats + skippedByWholeBucketFiles;
+ this.skippedTableFiles = skippedTableFiles;
this.resultedTableFiles = resultedTableFiles;
}
@@ -60,16 +52,6 @@ protected long getResultedTableFiles() {
return resultedTableFiles;
}
- @VisibleForTesting
- protected long getSkippedByPartitionAndStats() {
- return skippedByPartitionAndStats;
- }
-
- @VisibleForTesting
- protected long getSkippedByWholeBucketFiles() {
- return skippedByWholeBucketFiles;
- }
-
@VisibleForTesting
protected long getDuration() {
return duration;
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index b4f8fa47dbb1..73c55942a56a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -68,7 +68,7 @@
import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
import static org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
import static org.apache.paimon.utils.Preconditions.checkState;
-import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
+import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
/** An abstraction layer above {@link FileStoreCommit} to provide snapshot commit and expiration. */
public class TableCommitImpl implements InnerTableCommit {
@@ -292,7 +292,7 @@ private void checkFilesExistence(List committables) {
List nonExistFiles =
Lists.newArrayList(
- randomlyExecute(
+ randomlyExecuteSequentialReturn(
getExecutorService(null),
f -> nonExists.test(f) ? singletonList(f) : emptyList(),
files));
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index d3e8a2adb697..635802cc9dcb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -51,7 +51,7 @@ public DataTableBatchScan(
this.hasNext = true;
this.defaultValueAssigner = defaultValueAssigner;
if (pkTable && (options.deletionVectorsEnabled() || options.mergeEngine() == FIRST_ROW)) {
- snapshotReader.withLevelFilter(level -> level > 0);
+ snapshotReader.withLevelFilter(level -> level > 0).enableValueFilter();
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 358d86cbe948..9bfb54f2cf60 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -31,7 +31,6 @@
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
@@ -50,6 +49,7 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;
+import static org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link StartingScanner} for incremental changes by snapshot. */
@@ -84,7 +84,7 @@ public Result scan(SnapshotReader reader) {
.collect(Collectors.toList());
Iterator manifests =
- ManifestReadThreadPool.randomlyExecute(
+ randomlyExecuteSequentialReturn(
id -> {
Snapshot snapshot = snapshotManager.snapshot(id);
switch (scanMode) {
@@ -111,7 +111,7 @@ public Result scan(SnapshotReader reader) {
reader.parallelism());
Iterator entries =
- ManifestReadThreadPool.randomlyExecute(
+ randomlyExecuteSequentialReturn(
reader::readManifest, Lists.newArrayList(manifests), reader.parallelism());
while (entries.hasNext()) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index b59cf98bbb4c..f3e0a92b8fc7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -77,6 +77,8 @@ public interface SnapshotReader {
SnapshotReader withLevelFilter(Filter levelFilter);
+ SnapshotReader enableValueFilter();
+
SnapshotReader withManifestEntryFilter(Filter filter);
SnapshotReader withBucket(int bucket);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 7ce537ee52ec..ce01bdba9447 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -234,6 +234,12 @@ public SnapshotReader withLevelFilter(Filter levelFilter) {
return this;
}
+ @Override
+ public SnapshotReader enableValueFilter() {
+ scan.enableValueFilter();
+ return this;
+ }
+
@Override
public SnapshotReader withManifestEntryFilter(Filter filter) {
scan.withManifestEntryFilter(filter);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index e56ee90412ce..b0cbe0772b5e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -319,6 +319,12 @@ public SnapshotReader withLevelFilter(Filter levelFilter) {
return this;
}
+ @Override
+ public SnapshotReader enableValueFilter() {
+ wrapped.enableValueFilter();
+ return this;
+ }
+
@Override
public SnapshotReader withManifestEntryFilter(Filter filter) {
wrapped.withManifestEntryFilter(filter);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index e28ae3760534..deb149791c8f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -120,7 +120,8 @@ public List primaryKeys() {
public SnapshotReader newSnapshotReader() {
if (wrapped.schema().primaryKeys().size() > 0) {
return wrapped.newSnapshotReader()
- .withLevelFilter(level -> level == coreOptions().numLevels() - 1);
+ .withLevelFilter(level -> level == coreOptions().numLevels() - 1)
+ .enableValueFilter();
} else {
return wrapped.newSnapshotReader();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
index d967e778fe99..49fcfc8bd909 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ManifestReadThreadPool.java
@@ -54,9 +54,9 @@ public static Iterable sequentialBatchedExecute(
}
/** This method aims to parallel process tasks with randomly but return values sequentially. */
- public static Iterator randomlyExecute(
+ public static Iterator randomlyExecuteSequentialReturn(
Function> processor, List input, @Nullable Integer threadNum) {
ThreadPoolExecutor executor = getExecutorService(threadNum);
- return ThreadPoolUtils.randomlyExecute(executor, processor, input);
+ return ThreadPoolUtils.randomlyExecuteSequentialReturn(executor, processor, input);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index 8c490e008baa..1c9d9664f22f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -66,11 +66,12 @@ public List read(
K key,
@Nullable Long fileSize,
Filter loadFilter,
- Filter readFilter)
+ Filter readFilter,
+ Filter readVFilter)
throws IOException {
Segments segments = cache.getIfPresents(key);
if (segments != null) {
- return readFromSegments(segments, readFilter);
+ return readFromSegments(segments, readFilter, readVFilter);
} else {
if (fileSize == null) {
fileSize = fileSizeFunction.apply(key);
@@ -78,15 +79,16 @@ public List read(
if (fileSize <= cache.maxElementSize()) {
segments = readSegments(key, fileSize, loadFilter);
cache.put(key, segments);
- return readFromSegments(segments, readFilter);
+ return readFromSegments(segments, readFilter, readVFilter);
} else {
return readFromIterator(
- reader.apply(key, fileSize), projectedSerializer, readFilter);
+ reader.apply(key, fileSize), projectedSerializer, readFilter, readVFilter);
}
}
}
- private List readFromSegments(Segments segments, Filter readFilter)
+ private List readFromSegments(
+ Segments segments, Filter readFilter, Filter readVFilter)
throws IOException {
InternalRowSerializer formatSerializer = this.formatSerializer.get();
List entries = new ArrayList<>();
@@ -98,7 +100,10 @@ private List readFromSegments(Segments segments, Filter readFilt
try {
formatSerializer.mapFromPages(binaryRow, view);
if (readFilter.test(binaryRow)) {
- entries.add(projectedSerializer.fromRow(binaryRow));
+ V v = projectedSerializer.fromRow(binaryRow);
+ if (readVFilter.test(v)) {
+ entries.add(v);
+ }
}
} catch (EOFException e) {
return entries;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 3c261f410324..b0bea8e66a82 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -94,7 +94,8 @@ public List read(String fileName) {
}
public List read(String fileName, @Nullable Long fileSize) {
- return read(fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue());
+ return read(
+ fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue(), Filter.alwaysTrue());
}
public List readWithIOException(String fileName) throws IOException {
@@ -103,7 +104,8 @@ public List readWithIOException(String fileName) throws IOException {
public List readWithIOException(String fileName, @Nullable Long fileSize)
throws IOException {
- return readWithIOException(fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue());
+ return readWithIOException(
+ fileName, fileSize, Filter.alwaysTrue(), Filter.alwaysTrue(), Filter.alwaysTrue());
}
public boolean exists(String fileName) {
@@ -118,9 +120,10 @@ public List read(
String fileName,
@Nullable Long fileSize,
Filter loadFilter,
- Filter readFilter) {
+ Filter readFilter,
+ Filter readTFilter) {
try {
- return readWithIOException(fileName, fileSize, loadFilter, readFilter);
+ return readWithIOException(fileName, fileSize, loadFilter, readFilter, readTFilter);
} catch (IOException e) {
throw new RuntimeException("Failed to read " + fileName, e);
}
@@ -130,14 +133,16 @@ private List readWithIOException(
String fileName,
@Nullable Long fileSize,
Filter loadFilter,
- Filter readFilter)
+ Filter readFilter,
+ Filter readTFilter)
throws IOException {
Path path = pathFactory.toPath(fileName);
if (cache != null) {
- return cache.read(path, fileSize, loadFilter, readFilter);
+ return cache.read(path, fileSize, loadFilter, readFilter, readTFilter);
}
- return readFromIterator(createIterator(path, fileSize), serializer, readFilter);
+ return readFromIterator(
+ createIterator(path, fileSize), serializer, readFilter, readTFilter);
}
public String writeWithoutRolling(Collection records) {
@@ -184,13 +189,17 @@ public void delete(String fileName) {
public static List readFromIterator(
CloseableIterator inputIterator,
ObjectSerializer serializer,
- Filter readFilter) {
+ Filter readFilter,
+ Filter readVFilter) {
try (CloseableIterator iterator = inputIterator) {
List result = new ArrayList<>();
while (iterator.hasNext()) {
InternalRow row = iterator.next();
if (readFilter.test(row)) {
- result.add(serializer.fromRow(row));
+ V v = serializer.fromRow(row);
+ if (readVFilter.test(v)) {
+ result.add(v);
+ }
}
}
return result;
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
index 2b9d0e0cb728..a0427d95cab1 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
@@ -48,9 +48,7 @@ public void testGenericMetricsRegistration() {
ScanMetrics.SCAN_DURATION,
ScanMetrics.LAST_SCANNED_MANIFESTS,
ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES,
- ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES,
- ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS,
- ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER);
+ ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES);
}
/** Tests that the metrics are updated properly. */
@@ -66,14 +64,6 @@ public void testMetricsAreUpdated() {
(Histogram) registeredGenericMetrics.get(ScanMetrics.SCAN_DURATION);
Gauge lastScannedManifests =
(Gauge) registeredGenericMetrics.get(ScanMetrics.LAST_SCANNED_MANIFESTS);
- Gauge lastSkippedByPartitionAndStats =
- (Gauge)
- registeredGenericMetrics.get(
- ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS);
- Gauge lastSkippedByWholeBucketFilesFilter =
- (Gauge)
- registeredGenericMetrics.get(
- ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER);
Gauge lastScanSkippedTableFiles =
(Gauge)
registeredGenericMetrics.get(ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES);
@@ -85,8 +75,6 @@ public void testMetricsAreUpdated() {
assertThat(scanDuration.getCount()).isEqualTo(0);
assertThat(scanDuration.getStatistics().size()).isEqualTo(0);
assertThat(lastScannedManifests.getValue()).isEqualTo(0);
- assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(0);
- assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(0);
assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(0);
assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(0);
@@ -104,9 +92,7 @@ public void testMetricsAreUpdated() {
assertThat(scanDuration.getStatistics().getMax()).isEqualTo(200);
assertThat(scanDuration.getStatistics().getStdDev()).isEqualTo(0);
assertThat(lastScannedManifests.getValue()).isEqualTo(20);
- assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(25);
- assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(32);
- assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(57);
+ assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(25);
assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(10);
// report again
@@ -123,19 +109,17 @@ public void testMetricsAreUpdated() {
assertThat(scanDuration.getStatistics().getMax()).isEqualTo(500);
assertThat(scanDuration.getStatistics().getStdDev()).isCloseTo(212.132, offset(0.001));
assertThat(lastScannedManifests.getValue()).isEqualTo(22);
- assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(30);
- assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(33);
- assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(63);
+ assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(30);
assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(8);
}
private void reportOnce(ScanMetrics scanMetrics) {
- ScanStats scanStats = new ScanStats(200, 20, 25, 32, 10);
+ ScanStats scanStats = new ScanStats(200, 20, 25, 10);
scanMetrics.reportScan(scanStats);
}
private void reportAgain(ScanMetrics scanMetrics) {
- ScanStats scanStats = new ScanStats(500, 22, 30, 33, 8);
+ ScanStats scanStats = new ScanStats(500, 22, 30, 8);
scanMetrics.reportScan(scanStats);
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 133913c487cd..51c8b328dfc6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -121,17 +121,6 @@
/** Tests for {@link PrimaryKeyFileStoreTable}. */
public class PrimaryKeyFileStoreTableTest extends FileStoreTableTestBase {
- protected static final RowType COMPATIBILITY_ROW_TYPE =
- RowType.of(
- new DataType[] {
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.BIGINT(),
- DataTypes.BINARY(1),
- DataTypes.VARBINARY(1)
- },
- new String[] {"pt", "a", "b", "c", "d"});
-
protected static final Function COMPATIBILITY_BATCH_ROW_TO_STRING =
rowData ->
rowData.getInt(0)
@@ -144,12 +133,6 @@ public class PrimaryKeyFileStoreTableTest extends FileStoreTableTestBase {
+ "|"
+ new String(rowData.getBinary(4));
- protected static final Function COMPATIBILITY_CHANGELOG_ROW_TO_STRING =
- rowData ->
- rowData.getRowKind().shortString()
- + " "
- + COMPATIBILITY_BATCH_ROW_TO_STRING.apply(rowData);
-
@Test
public void testMultipleWriters() throws Exception {
WriteSelector selector =
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
index 8a4f0b0612e7..9d3275e3ab48 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -58,17 +58,23 @@ public void test() throws IOException {
// test empty
map.put("k1", Collections.emptyList());
- List values = cache.read("k1", null, Filter.alwaysTrue(), Filter.alwaysTrue());
+ List values =
+ cache.read(
+ "k1", null, Filter.alwaysTrue(), Filter.alwaysTrue(), Filter.alwaysTrue());
assertThat(values).isEmpty();
// test values
List expect = Arrays.asList("v1", "v2", "v3");
map.put("k2", expect);
- values = cache.read("k2", null, Filter.alwaysTrue(), Filter.alwaysTrue());
+ values =
+ cache.read(
+ "k2", null, Filter.alwaysTrue(), Filter.alwaysTrue(), Filter.alwaysTrue());
assertThat(values).containsExactlyElementsOf(expect);
// test cache
- values = cache.read("k2", null, Filter.alwaysTrue(), Filter.alwaysTrue());
+ values =
+ cache.read(
+ "k2", null, Filter.alwaysTrue(), Filter.alwaysTrue(), Filter.alwaysTrue());
assertThat(values).containsExactlyElementsOf(expect);
// test filter
@@ -77,7 +83,8 @@ public void test() throws IOException {
"k2",
null,
Filter.alwaysTrue(),
- r -> r.getString(0).toString().endsWith("2"));
+ r -> r.getString(0).toString().endsWith("2"),
+ Filter.alwaysTrue());
assertThat(values).containsExactly("v2");
// test load filter
@@ -88,6 +95,7 @@ public void test() throws IOException {
"k3",
null,
r -> r.getString(0).toString().endsWith("2"),
+ Filter.alwaysTrue(),
Filter.alwaysTrue());
assertThat(values).containsExactly("v2");
@@ -99,6 +107,7 @@ public void test() throws IOException {
"k4",
null,
r -> r.getString(0).toString().endsWith("5"),
+ Filter.alwaysTrue(),
Filter.alwaysTrue());
assertThat(values).isEmpty();
@@ -117,6 +126,7 @@ public void test() throws IOException {
k,
null,
Filter.alwaysTrue(),
+ Filter.alwaysTrue(),
Filter.alwaysTrue()))
.containsExactly(k);
} catch (IOException e) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index b7eb1d625ce3..559976921e2e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -77,7 +77,7 @@ public SplitEnumerator restoreEnu
nextSnapshotId = checkpoint.currentSnapshotId();
splits = checkpoint.splits();
}
- StreamTableScan scan = readBuilder.dropStats().newStreamScan();
+ StreamTableScan scan = readBuilder.newStreamScan();
if (metricGroup(context) != null) {
((StreamDataTableScan) scan)
.withMetricsRegistry(new FlinkMetricRegistry(context.metricGroup()));
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index a648bfba607d..b3dcd4840cc1 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -178,7 +178,7 @@ private ReadBuilder createReadBuilder() {
if (limit != null) {
readBuilder.withLimit(limit.intValue());
}
- return readBuilder;
+ return readBuilder.dropStats();
}
private DataStream buildStaticFileSource() {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 9bfd36fdfaa8..12b579589d0f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -31,6 +31,7 @@
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -172,12 +173,7 @@ protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
protected void scanSplitsForInference() {
if (splitStatistics == null) {
if (table instanceof DataTable) {
- List partitionEntries =
- table.newReadBuilder()
- .withFilter(predicate)
- .dropStats()
- .newScan()
- .listPartitionEntries();
+ List partitionEntries = newTableScan().listPartitionEntries();
long totalSize = 0;
long rowCount = 0;
for (PartitionEntry entry : partitionEntries) {
@@ -188,13 +184,7 @@ protected void scanSplitsForInference() {
splitStatistics =
new SplitStatistics((int) (totalSize / splitTargetSize + 1), rowCount);
} else {
- List splits =
- table.newReadBuilder()
- .withFilter(predicate)
- .dropStats()
- .newScan()
- .plan()
- .splits();
+ List splits = newTableScan().plan().splits();
splitStatistics =
new SplitStatistics(
splits.size(), splits.stream().mapToLong(Split::rowCount).sum());
@@ -202,6 +192,10 @@ protected void scanSplitsForInference() {
}
}
+ private TableScan newTableScan() {
+ return table.newReadBuilder().dropStats().withFilter(predicate).newScan();
+ }
+
/** Split statistics for inferring row count and parallelism size. */
protected static class SplitStatistics {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index c388a6dccbbc..af425aab5e46 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -87,7 +87,7 @@ public SplitEnumerator restoreEnu
private List getSplits(SplitEnumeratorContext context) {
FileStoreSourceSplitGenerator splitGenerator = new FileStoreSourceSplitGenerator();
- TableScan scan = readBuilder.dropStats().newScan();
+ TableScan scan = readBuilder.newScan();
// register scan metrics
if (context.metricGroup() != null) {
((InnerTableScan) scan)
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
index f21922670471..3805f6f8c536 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -106,7 +106,7 @@ public MonitorFunction(
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- this.scan = readBuilder.dropStats().newStreamScan();
+ this.scan = readBuilder.newStreamScan();
this.checkpointState =
context.getOperatorStateStore()
diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 33cbc19e0326..144afab8e1fa 100644
--- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -96,7 +96,8 @@ public static InputSplit[] generateSplits(FileStoreTable table, JobConf jobConf)
scan.withFilter(PredicateBuilder.and(predicatePerPartition));
}
}
- scan.plan()
+ scan.dropStats()
+ .plan()
.splits()
.forEach(
split ->
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index 95c8f4b3a9a8..f29c146b775a 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -62,7 +62,7 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
_readBuilder.withFilter(pushedPredicate)
}
pushDownLimit.foreach(_readBuilder.withLimit)
- _readBuilder
+ _readBuilder.dropStats()
}
final def metadataColumns: Seq[PaimonMetadataColumn] = {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
index 19f73cb6cc68..9a88ca2e4c3a 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.manifest.PartitionEntry
import org.apache.paimon.schema.TableSchema
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
@@ -64,11 +65,9 @@ case class PaimonAnalyzeTableColumnCommand(
// compute stats
val totalSize = table
.newScan()
- .plan()
- .splits()
+ .listPartitionEntries()
.asScala
- .flatMap { case split: DataSplit => split.dataFiles().asScala }
- .map(_.fileSize())
+ .map(_.fileSizeInBytes())
.sum
val (mergedRecordCount, colStats) =
PaimonStatsUtils.computeColumnStats(sparkSession, relation, attributes)
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index b44a66fce3ff..7e61d71ac183 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -44,7 +44,8 @@ private[spark] trait StreamHelper {
var lastTriggerMillis: Long
- private lazy val streamScan: StreamDataTableScan = table.newStreamScan()
+ private lazy val streamScan: StreamDataTableScan =
+ table.newStreamScan().dropStats().asInstanceOf[StreamDataTableScan]
private lazy val partitionSchema: StructType =
SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), table.partitionKeys()))