Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public List<Split> splits() {
}

private List<Pair<LinkedHashMap<String, String>, Path>> findPartitions() {
boolean onlyValueInPath = coreOptions.formatTablePartitionOnlyValueInPath();
if (partitionFilter instanceof MultiplePartitionPredicate) {
// generate partitions directly
Set<BinaryRow> partitions = ((MultiplePartitionPredicate) partitionFilter).partitions();
Expand All @@ -160,7 +161,7 @@ private List<Pair<LinkedHashMap<String, String>, Path>> findPartitions() {
table.defaultPartName(),
new Path(table.location()),
partitions,
coreOptions.formatTablePartitionOnlyValueInPath());
onlyValueInPath);
} else {
// search paths
Pair<Path, Integer> scanPathAndLevel =
Expand All @@ -169,15 +170,13 @@ private List<Pair<LinkedHashMap<String, String>, Path>> findPartitions() {
table.partitionKeys(),
partitionFilter,
table.partitionType(),
coreOptions.formatTablePartitionOnlyValueInPath());
Path scanPath = scanPathAndLevel.getLeft();
int level = scanPathAndLevel.getRight();
onlyValueInPath);
return searchPartSpecAndPaths(
table.fileIO(),
scanPath,
level,
scanPathAndLevel.getLeft(),
scanPathAndLevel.getRight(),
table.partitionKeys(),
coreOptions.formatTablePartitionOnlyValueInPath());
onlyValueInPath);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
Expand Down Expand Up @@ -272,23 +270,28 @@ public static List<Pair<LinkedHashMap<String, String>, Path>> searchPartSpecAndP
FileIO fileIO,
Path path,
int partitionNumber,
@Nullable List<String> partitionKeys,
boolean enablePartitionOnlyValueInPath) {
List<String> partitionKeys,
boolean onlyValueInPath) {
FileStatus[] generatedParts = getFileStatusRecurse(path, partitionNumber, fileIO);
List<Pair<LinkedHashMap<String, String>, Path>> ret = new ArrayList<>();
for (FileStatus part : generatedParts) {
// ignore hidden file
if (isHiddenFile(part)) {
continue;
}
if (enablePartitionOnlyValueInPath && partitionKeys != null) {
if (onlyValueInPath) {
ret.add(
Pair.of(
extractPartitionSpecFromPathOnlyValue(
part.getPath(), partitionKeys),
part.getPath()));
} else {
ret.add(Pair.of(extractPartitionSpecFromPath(part.getPath()), part.getPath()));
LinkedHashMap<String, String> spec = extractPartitionSpecFromPath(part.getPath());
if (spec.size() != partitionKeys.size()) {
// illegal path, for example: /path/to/table/tmp/unknown, path without "="
continue;
}
ret.add(Pair.of(spec, part.getPath()));
}
}
return ret;
Expand All @@ -314,6 +317,10 @@ private static void listStatusRecursively(
int expectLevel,
List<FileStatus> results)
throws IOException {
if (isHiddenFile(fileStatus.getPath())) {
return;
}

if (expectLevel == level) {
results.add(fileStatus);
return;
Expand All @@ -327,7 +334,11 @@ private static void listStatusRecursively(
}

private static boolean isHiddenFile(FileStatus fileStatus) {
String name = fileStatus.getPath().getName();
return isHiddenFile(fileStatus.getPath());
}

private static boolean isHiddenFile(Path path) {
String name = path.getName();
return name.startsWith("_") || name.startsWith(".");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void testComputeScanPathWithoutFilter() throws IOException {
partitionType,
enablePartitionValueOnly);

// Should optimize to specific partition path for first key
// Should not be optimized because of greater than
assertThat(result.getLeft()).isEqualTo(tableLocation);
assertThat(result.getRight()).isEqualTo(2);

Expand Down Expand Up @@ -202,6 +202,7 @@ void testGetScanPathAndLevelWithEqualityFilter() throws IOException {
partitionType,
enablePartitionValueOnly);
String partitionPath = enablePartitionValueOnly ? "2023/12" : "year=2023/month=12";

// Should optimize to specific partition path
assertThat(result.getLeft().toString()).isEqualTo(tableLocation + partitionPath);
assertThat(result.getRight()).isEqualTo(0);
Expand Down Expand Up @@ -265,6 +266,82 @@ void testComputeScanPathWithFirstLevel() throws IOException {
assertThat(searched.size()).isEqualTo(1);
}

@TestTemplate
void testNoOptimizationWithSecondEquality() throws IOException {
Path tableLocation = new Path(tmpPath.toUri());
// Create equality predicate for only the second partition key
PredicateBuilder builder = new PredicateBuilder(partitionType);
Predicate predicate =
PredicateBuilder.and(builder.greaterOrEqual(0, 2023), builder.equal(1, 12));
PartitionPredicate partitionFilter =
PartitionPredicate.fromPredicate(partitionType, predicate);

Pair<Path, Integer> result =
FormatTableScan.computeScanPathAndLevel(
tableLocation,
partitionKeys,
partitionFilter,
partitionType,
enablePartitionValueOnly);

// Should not optimize with second equality filter
assertThat(result.getLeft()).isEqualTo(tableLocation);
assertThat(result.getRight()).isEqualTo(2);

// test searchPartSpecAndPaths
LocalFileIO fileIO = LocalFileIO.create();
String partitionPath = enablePartitionValueOnly ? "2023/12" : "year=2023/month=12";
fileIO.mkdirs(new Path(tableLocation, partitionPath));
List<Pair<LinkedHashMap<String, String>, Path>> searched =
searchPartSpecAndPaths(
fileIO,
result.getLeft(),
result.getRight(),
partitionKeys,
enablePartitionValueOnly);
LinkedHashMap<String, String> expectPartitionSpec =
new LinkedHashMap<>(partitionKeys.size());
expectPartitionSpec.put("year", "2023");
expectPartitionSpec.put("month", "12");
assertThat(searched.get(0).getLeft()).isEqualTo(expectPartitionSpec);
assertThat(searched.size()).isEqualTo(1);
}

@TestTemplate
void testSkipIllegalPath() throws IOException {
Path tableLocation = new Path(tmpPath.toUri());
PartitionPredicate partitionFilter = PartitionPredicate.fromPredicate(partitionType, null);
Pair<Path, Integer> result =
FormatTableScan.computeScanPathAndLevel(
tableLocation,
partitionKeys,
partitionFilter,
partitionType,
enablePartitionValueOnly);

LocalFileIO fileIO = LocalFileIO.create();
String illegalPath =
enablePartitionValueOnly
? "_unknown-year/unknown-month"
: "unknown-year/unknown-month";
fileIO.mkdirs(new Path(tableLocation, illegalPath));
String partitionPath = enablePartitionValueOnly ? "2023/12" : "year=2023/month=12";
fileIO.mkdirs(new Path(tableLocation, partitionPath));
List<Pair<LinkedHashMap<String, String>, Path>> searched =
searchPartSpecAndPaths(
fileIO,
result.getLeft(),
result.getRight(),
partitionKeys,
enablePartitionValueOnly);
LinkedHashMap<String, String> expectPartitionSpec =
new LinkedHashMap<>(partitionKeys.size());
expectPartitionSpec.put("year", "2023");
expectPartitionSpec.put("month", "12");
assertThat(searched.get(0).getLeft()).isEqualTo(expectPartitionSpec);
assertThat(searched.size()).isEqualTo(1);
}

@TestTemplate
void testComputeScanPathAndLevel() {
Path tableLocation = new Path(tmpPath.toUri());
Expand Down
Loading