From 4b4eef7d18c235c068011870c305ecb4937306c1 Mon Sep 17 00:00:00 2001 From: Yann Date: Tue, 21 May 2024 21:48:15 +0800 Subject: [PATCH 1/7] [core] introduce deletedIndexFiles in IndexIncrement and allow multi index files in the same partition and bucket --- .../org/apache/paimon/AbstractFileStore.java | 3 +- .../DeletionVectorsIndexFile.java | 18 ++- .../DeletionVectorsMaintainer.java | 27 +++- .../paimon/index/HashIndexMaintainer.java | 2 +- .../apache/paimon/index/IndexFileHandler.java | 68 ++++---- .../org/apache/paimon/io/IndexIncrement.java | 31 +++- .../paimon/manifest/IndexManifestFile.java | 33 ++-- .../manifest/IndexManifestFileHandler.java | 150 ++++++++++++++++++ .../paimon/operation/FileStoreCommitImpl.java | 29 +++- .../table/sink/CommitMessageSerializer.java | 5 +- .../paimon/table/sink/TableCommitImpl.java | 4 + .../source/snapshot/SnapshotReaderImpl.java | 52 +++--- .../DeletionVectorsIndexFileTest.java | 14 +- .../DeletionVectorsMaintainerTest.java | 3 +- .../paimon/operation/FileStoreCommitTest.java | 8 +- 15 files changed, 336 insertions(+), 111 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 8cbf433f61f9..7528b393d6e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -191,7 +191,8 @@ public FileStoreCommitImpl newCommit(String commitUser) { partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(), newKeyComparator(), options.branch(), - newStatsFileHandler()); + newStatsFileHandler(), + bucketMode()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java index 733fb7825440..a6283ba97ec4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java @@ -22,6 +22,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.index.IndexFile; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.PathFactory; @@ -30,6 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -48,18 +50,20 @@ public DeletionVectorsIndexFile(FileIO fileIO, PathFactory pathFactory) { /** * Reads all deletion vectors from a specified file. * - * @param fileName The name of the file from which to read the deletion vectors. - * @param deletionVectorRanges A map where the key represents which file the DeletionVector - * belongs to and the value is a Pair object specifying the range (start position and size) - * within the file where the deletion vector data is located. * @return A map where the key represents which file the DeletionVector belongs to, and the * value is the corresponding DeletionVector object. * @throws UncheckedIOException If an I/O error occurs while reading from the file. */ - public Map readAllDeletionVectors( - String fileName, LinkedHashMap> deletionVectorRanges) { + public Map readAllDeletionVectors(IndexFileMeta fileMeta) { + LinkedHashMap> deletionVectorRanges = + fileMeta.deletionVectorsRanges(); + if (deletionVectorRanges == null || deletionVectorRanges.isEmpty()) { + return Collections.emptyMap(); + } + + String indexFileName = fileMeta.fileName(); Map deletionVectors = new HashMap<>(); - Path filePath = pathFactory.toPath(fileName); + Path filePath = pathFactory.toPath(indexFileName); try (SeekableInputStream inputStream = fileIO.newInputStream(filePath)) { checkVersion(inputStream); DataInputStream dataInputStream = new DataInputStream(inputStream); diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java index 3beb996238de..461e092e0396 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.IndexManifestEntry; import javax.annotation.Nullable; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; @@ -129,15 +131,26 @@ public Factory(IndexFileHandler handler) { public DeletionVectorsMaintainer createOrRestore( @Nullable Long snapshotId, BinaryRow partition, int bucket) { - IndexFileMeta indexFile = + List indexFiles = snapshotId == null - ? null - : handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket) - .orElse(null); + ? Collections.emptyList() + : handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket); Map deletionVectors = - indexFile == null - ? new HashMap<>() - : new HashMap<>(handler.readAllDeletionVectors(indexFile)); + new HashMap<>(handler.readAllDeletionVectors(indexFiles)); + return createOrRestore(deletionVectors); + } + + @VisibleForTesting + public DeletionVectorsMaintainer createOrRestore( + @Nullable Long snapshotId, BinaryRow partition) { + List indexFiles = + snapshotId == null + ? Collections.emptyList() + : handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition).stream() + .map(IndexManifestEntry::indexFile) + .collect(Collectors.toList()); + Map deletionVectors = + new HashMap<>(handler.readAllDeletionVectors(indexFiles)); return createOrRestore(deletionVectors); } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java index 66a8d6409a9c..41e4865253b6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java @@ -51,7 +51,7 @@ private HashIndexMaintainer( IntHashSet hashcode = new IntHashSet(); if (snapshotId != null) { Optional indexFile = - fileHandler.scan(snapshotId, HashIndexFile.HASH_INDEX, partition, bucket); + fileHandler.scanHashIndex(snapshotId, partition, bucket); if (indexFile.isPresent()) { IndexFileMeta file = indexFile.get(); hashcode = new IntHashSet((int) file.rowCount()); diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index fdc2fe754ff8..40c676362c77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -34,6 +34,7 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -64,20 +65,38 @@ public IndexFileHandler( this.deletionVectorsIndex = deletionVectorsIndex; } - public Optional scan( + public DeletionVectorsIndexFile deletionVectorsIndex() { + return this.deletionVectorsIndex; + } + + public List scan() { + Snapshot snapshot = snapshotManager.latestSnapshot(); + String indexManifest = snapshot.indexManifest(); + if (indexManifest == null) { + return Collections.emptyList(); + } + return indexManifestFile.read(indexManifest); + } + + public Optional scanHashIndex(long snapshotId, BinaryRow partition, int bucket) { + List result = scan(snapshotId, HASH_INDEX, partition, bucket); + if (result.size() > 1) { + throw new IllegalArgumentException( + "Find multiple hash index files for one bucket: " + result); + } + return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0)); + } + + public List scan( long snapshotId, String indexType, BinaryRow partition, int bucket) { List entries = scan(snapshotId, indexType, partition); - List result = new ArrayList<>(); + List result = new ArrayList<>(); for (IndexManifestEntry file : entries) { if (file.bucket() == bucket) { - result.add(file); + result.add(file.indexFile()); } } - if (result.size() > 1) { - throw new IllegalArgumentException( - "Find multiple index files for one bucket: " + result); - } - return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0).indexFile()); + return result; } public List scan(String indexType, BinaryRow partition) { @@ -167,31 +186,16 @@ public void deleteManifest(String indexManifest) { indexManifestFile.delete(indexManifest); } - public Map readAllDeletionVectors(IndexFileMeta fileMeta) { - if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) { - throw new IllegalArgumentException( - "Input file is not deletion vectors index " + fileMeta.indexType()); - } - LinkedHashMap> deleteIndexRange = - fileMeta.deletionVectorsRanges(); - if (deleteIndexRange == null || deleteIndexRange.isEmpty()) { - return Collections.emptyMap(); - } - return deletionVectorsIndex.readAllDeletionVectors(fileMeta.fileName(), deleteIndexRange); - } - - public Optional readDeletionVector(IndexFileMeta fileMeta, String fileName) { - if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) { - throw new IllegalArgumentException( - "Input file is not deletion vectors index " + fileMeta.indexType()); - } - Map> deleteIndexRange = fileMeta.deletionVectorsRanges(); - if (deleteIndexRange == null || !deleteIndexRange.containsKey(fileName)) { - return Optional.empty(); + public Map readAllDeletionVectors(List fileMetas) { + Map deletionVectors = new HashMap<>(); + for (IndexFileMeta indexFile : fileMetas) { + if (!indexFile.indexType().equals(DELETION_VECTORS_INDEX)) { + throw new IllegalArgumentException( + "Input file is not deletion vectors index " + indexFile.indexType()); + } + deletionVectors.putAll(deletionVectorsIndex.readAllDeletionVectors(indexFile)); } - return Optional.of( - deletionVectorsIndex.readDeletionVector( - fileMeta.fileName(), deleteIndexRange.get(fileName))); + return deletionVectors; } public IndexFileMeta writeDeletionVectorsIndex(Map deletionVectors) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java index a62d24109cc2..8e53adfbfc4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java @@ -20,6 +20,8 @@ import org.apache.paimon.index.IndexFileMeta; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -28,16 +30,29 @@ public class IndexIncrement { private final List newIndexFiles; + private final List deletedIndexFiles; + public IndexIncrement(List newIndexFiles) { this.newIndexFiles = newIndexFiles; + this.deletedIndexFiles = Collections.emptyList(); + } + + public IndexIncrement( + List newIndexFiles, List deletedIndexFiles) { + this.newIndexFiles = newIndexFiles; + this.deletedIndexFiles = deletedIndexFiles; } public List newIndexFiles() { return newIndexFiles; } + public List deletedIndexFiles() { + return deletedIndexFiles; + } + public boolean isEmpty() { - return newIndexFiles.isEmpty(); + return newIndexFiles.isEmpty() && deletedIndexFiles.isEmpty(); } @Override @@ -49,16 +64,24 @@ public boolean equals(Object o) { return false; } IndexIncrement that = (IndexIncrement) o; - return Objects.equals(newIndexFiles, that.newIndexFiles); + return Objects.equals(newIndexFiles, that.newIndexFiles) + && Objects.equals(deletedIndexFiles, that.deletedIndexFiles); } @Override public int hashCode() { - return Objects.hash(newIndexFiles); + List all = new ArrayList<>(newIndexFiles); + all.addAll(deletedIndexFiles); + return Objects.hash(all); } @Override public String toString() { - return "IndexIncrement{" + "newIndexFiles=" + newIndexFiles + '}'; + return "IndexIncrement{" + + "newIndexFiles=" + + newIndexFiles + + ",deletedIndexFiles=" + + deletedIndexFiles + + "}"; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java index 235f8c509416..8a91f348311e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java @@ -22,7 +22,7 @@ import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.FormatWriterFactory; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.manifest.IndexManifestEntry.Identifier; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.ObjectsFile; @@ -31,10 +31,7 @@ import javax.annotation.Nullable; -import java.util.ArrayList; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; /** Index manifest file. */ public class IndexManifestFile extends ObjectsFile { @@ -53,27 +50,17 @@ private IndexManifestFile( null); } - /** Merge new index files to index manifest. */ + /** Write new index files to index manifest. */ @Nullable - public String merge( - @Nullable String previousIndexManifest, List newIndexFiles) { - String indexManifest = previousIndexManifest; - if (newIndexFiles.size() > 0) { - Map indexEntries = new LinkedHashMap<>(); - List entries = - indexManifest == null ? new ArrayList<>() : read(indexManifest); - entries.addAll(newIndexFiles); - for (IndexManifestEntry file : entries) { - if (file.kind() == FileKind.ADD) { - indexEntries.put(file.identifier(), file); - } else { - indexEntries.remove(file.identifier()); - } - } - indexManifest = writeWithoutRolling(indexEntries.values()); + public String writeIndexFiles( + @Nullable String previousIndexManifest, + List newIndexFiles, + BucketMode bucketMode) { + if (newIndexFiles.isEmpty()) { + return previousIndexManifest; } - - return indexManifest; + IndexManifestFileHandler handler = new IndexManifestFileHandler(this, bucketMode); + return handler.write(previousIndexManifest, newIndexFiles); } /** Creator of {@link IndexManifestFile}. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java new file mode 100644 index 000000000000..d7a952f239b2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.utils.Pair; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; +import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; + +/** IndexManifestFile Handler */ +public class IndexManifestFileHandler { + + private final IndexManifestFile indexManifestFile; + + private final BucketMode bucketMode; + + IndexManifestFileHandler(IndexManifestFile indexManifestFile, BucketMode bucketMode) { + this.indexManifestFile = indexManifestFile; + this.bucketMode = bucketMode; + } + + String write(@Nullable String previousIndexManifest, List newIndexFiles) { + List entries = + previousIndexManifest == null + ? new ArrayList<>() + : indexManifestFile.read(previousIndexManifest); + Pair, List> previous = + separateIndexEntries(entries); + Pair, List> current = + separateIndexEntries(newIndexFiles); + + // Step1: get the hash index files; + List indexEntries = + getIndexManifestFileCombine(HASH_INDEX) + .combine(previous.getLeft(), current.getLeft()); + + // Step2: get the dv index files; + indexEntries.addAll( + getIndexManifestFileCombine(DELETION_VECTORS_INDEX) + .combine(previous.getRight(), current.getRight())); + + return indexManifestFile.writeWithoutRolling(indexEntries); + } + + private Pair, List> separateIndexEntries( + List indexFiles) { + List hashEntries = new ArrayList<>(); + List dvEntries = new ArrayList<>(); + for (IndexManifestEntry entry : indexFiles) { + String indexType = entry.indexFile().indexType(); + if (indexType.equals(DELETION_VECTORS_INDEX)) { + dvEntries.add(entry); + } else if (indexType.equals(HASH_INDEX)) { + hashEntries.add(entry); + } else { + throw new IllegalArgumentException("Can't recognize this index type: " + indexType); + } + } + return Pair.of(hashEntries, dvEntries); + } + + private IndexManifestFileCombiner getIndexManifestFileCombine(String indexType) { + if (DELETION_VECTORS_INDEX.equals(indexType) && BucketMode.BUCKET_UNAWARE == bucketMode) { + return new UnawareBucketCombiner(); + } else { + return new CommonBucketCombiner(); + } + } + + interface IndexManifestFileCombiner { + List combine( + List prevIndexFiles, List newIndexFiles); + } + + /** + * We combine the previous and new index files by the file name. This is only used for tables + * with UnawareBucket. + */ + static class UnawareBucketCombiner implements IndexManifestFileCombiner { + + @Override + public List combine( + List prevIndexFiles, List newIndexFiles) { + Map indexEntries = new LinkedHashMap<>(); + for (IndexManifestEntry entry : prevIndexFiles) { + if (entry.kind() == FileKind.ADD) { + indexEntries.put(entry.indexFile().fileName(), entry); + } + } + + for (IndexManifestEntry entry : newIndexFiles) { + if (entry.kind() == FileKind.ADD) { + indexEntries.put(entry.indexFile().fileName(), entry); + } else { + indexEntries.remove(entry.indexFile().fileName()); + } + } + return new ArrayList<>(indexEntries.values()); + } + } + + /** We combine the previous and new index files by {@link IndexManifestEntry#identifier} */ + static class CommonBucketCombiner implements IndexManifestFileCombiner { + + @Override + public List combine( + List prevIndexFiles, List newIndexFiles) { + Map indexEntries = + prevIndexFiles.stream() + .filter(entry -> entry.kind() == FileKind.ADD) + .collect( + Collectors.toMap( + IndexManifestEntry::identifier, Function.identity())); + for (IndexManifestEntry entry : newIndexFiles) { + if (entry.kind() == FileKind.ADD) { + indexEntries.put(entry.identifier(), entry); + } else { + indexEntries.remove(entry.identifier()); + } + } + return new ArrayList<>(indexEntries.values()); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 7ddf8c210284..0001d5b19867 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -44,6 +44,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.stats.Statistics; import org.apache.paimon.stats.StatsFileHandler; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.types.RowType; @@ -125,6 +126,8 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final StatsFileHandler statsFileHandler; + private final BucketMode bucketMode; + public FileStoreCommitImpl( FileIO fileIO, SchemaManager schemaManager, @@ -143,7 +146,8 @@ public FileStoreCommitImpl( boolean dynamicPartitionOverwrite, @Nullable Comparator keyComparator, String branchName, - StatsFileHandler statsFileHandler) { + StatsFileHandler statsFileHandler, + BucketMode bucketMode) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.commitUser = commitUser; @@ -166,6 +170,7 @@ public FileStoreCommitImpl( this.ignoreEmptyCommit = true; this.commitMetrics = null; this.statsFileHandler = statsFileHandler; + this.bucketMode = bucketMode; } @Override @@ -633,6 +638,24 @@ private void collectChanges( "Unknown index type: " + f.indexType()); } }); + commitMessage + .indexIncrement() + .deletedIndexFiles() + .forEach( + f -> { + if (f.indexType().equals(DELETION_VECTORS_INDEX)) { + compactDvIndexFiles.add( + new IndexManifestEntry( + FileKind.DELETE, + commitMessage.partition(), + commitMessage.bucket(), + f)); + } else { + throw new RuntimeException( + "This index type is not supported to delete: " + + f.indexType()); + } + }); } } @@ -834,7 +857,9 @@ public boolean tryCommitOnce( } // write new index manifest - String indexManifest = indexManifestFile.merge(previousIndexManifest, indexFiles); + String indexManifest = + indexManifestFile.writeIndexFiles( + previousIndexManifest, indexFiles, bucketMode); if (!Objects.equals(indexManifest, previousIndexManifest)) { newIndexManifest = indexManifest; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index 24c9fc8924b2..a7b566b32994 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -81,6 +81,7 @@ private void serialize(CommitMessage obj, DataOutputView view) throws IOExceptio dataFileSerializer.serializeList(message.compactIncrement().compactAfter(), view); dataFileSerializer.serializeList(message.compactIncrement().changelogFiles(), view); indexEntrySerializer.serializeList(message.indexIncrement().newIndexFiles(), view); + indexEntrySerializer.serializeList(message.indexIncrement().deletedIndexFiles(), view); } @Override @@ -124,6 +125,8 @@ private CommitMessage deserialize(DataInputView view) throws IOException { dataFileSerializer.deserializeList(view), dataFileSerializer.deserializeList(view), dataFileSerializer.deserializeList(view)), - new IndexIncrement(indexEntrySerializer.deserializeList(view))); + new IndexIncrement( + indexEntrySerializer.deserializeList(view), + indexEntrySerializer.deserializeList(view))); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index eab7a0c6d036..d244c2a947c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -281,6 +281,10 @@ private void checkFilesExistence(List committables) { .map(IndexFileMeta::fileName) .map(indexFileFactory::toPath) .forEach(files::add); + msg.indexIncrement().deletedIndexFiles().stream() + .map(IndexFileMeta::fileName) + .map(indexFileFactory::toPath) + .forEach(files::add); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 2ed258e85a59..ffb52ce6fece 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -48,8 +48,6 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TypeUtils; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -284,12 +282,11 @@ private List generateSplits( ? splitGenerator.splitForStreaming(bucketFiles) : splitGenerator.splitForBatch(bucketFiles); - IndexFileMeta deletionIndexFile = + List deletionIndexFiles = deletionVectors - ? indexFileHandler - .scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket) - .orElse(null) - : null; + ? indexFileHandler.scan( + snapshotId, DELETION_VECTORS_INDEX, partition, bucket) + : Collections.emptyList(); for (SplitGenerator.SplitGroup splitGroup : splitGroups) { List dataFiles = splitGroup.files; String bucketPath = pathFactory.bucketPath(partition, bucket).toString(); @@ -298,7 +295,7 @@ private List generateSplits( .withBucketPath(bucketPath); if (deletionVectors) { builder.withDataDeletionFiles( - getDeletionFiles(dataFiles, deletionIndexFile)); + getDeletionFiles(dataFiles, deletionIndexFiles)); } splits.add(builder.build()); @@ -373,16 +370,15 @@ private Plan toChangesPlan( .isStreaming(isStreaming) .withBucketPath(pathFactory.bucketPath(part, bucket).toString()); if (deletionVectors) { - IndexFileMeta beforeDeletionIndex = - indexFileHandler - .scan(beforeSnapshotId, DELETION_VECTORS_INDEX, part, bucket) - .orElse(null); - IndexFileMeta deletionIndex = - indexFileHandler - .scan(plan.snapshotId(), DELETION_VECTORS_INDEX, part, bucket) - .orElse(null); - builder.withBeforeDeletionFiles(getDeletionFiles(before, beforeDeletionIndex)); - builder.withDataDeletionFiles(getDeletionFiles(data, deletionIndex)); + List beforeDeletionIndexes = + indexFileHandler.scan( + beforeSnapshotId, DELETION_VECTORS_INDEX, part, bucket); + List deletionIndexes = + indexFileHandler.scan( + plan.snapshotId(), DELETION_VECTORS_INDEX, part, bucket); + builder.withBeforeDeletionFiles( + getDeletionFiles(before, beforeDeletionIndexes)); + builder.withDataDeletionFiles(getDeletionFiles(data, deletionIndexes)); } splits.add(builder.build()); } @@ -412,14 +408,22 @@ private RecordComparator partitionComparator() { } private List getDeletionFiles( - List dataFiles, @Nullable IndexFileMeta indexFileMeta) { + List dataFiles, List indexFileMetas) { List deletionFiles = new ArrayList<>(dataFiles.size()); - Map> deletionRanges = - indexFileMeta == null ? null : indexFileMeta.deletionVectorsRanges(); + Map dataFileToIndexFileMeta = new HashMap<>(); + for (IndexFileMeta indexFileMeta : indexFileMetas) { + if (indexFileMeta.deletionVectorsRanges() != null) { + for (String dataFileName : indexFileMeta.deletionVectorsRanges().keySet()) { + dataFileToIndexFileMeta.put(dataFileName, indexFileMeta); + } + } + } for (DataFileMeta file : dataFiles) { - if (deletionRanges != null) { - Pair range = deletionRanges.get(file.fileName()); - if (range != null) { + IndexFileMeta indexFileMeta = dataFileToIndexFileMeta.get(file.fileName()); + if (indexFileMeta != null) { + Map> ranges = indexFileMeta.deletionVectorsRanges(); + if (ranges != null && ranges.containsKey(file.fileName())) { + Pair range = ranges.get(file.fileName()); deletionFiles.add( new DeletionFile( indexFileHandler.filePath(indexFileMeta).toString(), diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java index fbaa407d2b30..c43950a447c0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.PathFactory; @@ -32,6 +33,7 @@ import java.util.Random; import java.util.UUID; +import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link DeletionVectorsIndexFile}. */ @@ -66,8 +68,10 @@ public void testReadDvIndex() { LinkedHashMap> deletionVectorRanges = pair.getRight(); // read + IndexFileMeta indexFileMeta = + new IndexFileMeta(DELETION_VECTORS_INDEX, fileName, 0L, 0L, deletionVectorRanges); Map actualDeleteMap = - deletionVectorsIndexFile.readAllDeletionVectors(fileName, deletionVectorRanges); + deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta); assertThat(actualDeleteMap.get("file1.parquet").isDeleted(1)).isTrue(); assertThat(actualDeleteMap.get("file1.parquet").isDeleted(2)).isFalse(); assertThat(actualDeleteMap.get("file2.parquet").isDeleted(2)).isTrue(); @@ -104,8 +108,10 @@ public void testReadDvIndexWithCopiousDv() { deletionVectorsIndexFile.write(deleteMap); // read + IndexFileMeta indexFileMeta = + new IndexFileMeta(DELETION_VECTORS_INDEX, pair.getLeft(), 0L, 0L, pair.getRight()); Map dvs = - deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(), pair.getRight()); + deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta); assertThat(dvs.size()).isEqualTo(100000); } @@ -128,8 +134,10 @@ public void testReadDvIndexWithEnormousDv() { deletionVectorsIndexFile.write(deleteMap); // read + IndexFileMeta indexFileMeta = + new IndexFileMeta(DELETION_VECTORS_INDEX, pair.getLeft(), 0L, 0L, pair.getRight()); Map dvs = - deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(), pair.getRight()); + deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta); assertThat(dvs.size()).isEqualTo(1); } diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java index 9bfb4f81a86a..5a69541bb9a6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java @@ -56,8 +56,7 @@ public void test0() { assertThat(dvMaintainer.deletionVectorOf("f3")).isEmpty(); List fileMetas = dvMaintainer.prepareCommit(); - Map deletionVectors = - fileHandler.readAllDeletionVectors(fileMetas.get(0)); + Map deletionVectors = fileHandler.readAllDeletionVectors(fileMetas); assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue(); assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse(); assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index d603eda41e68..d51456cb0d81 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -757,7 +757,7 @@ public void testIndexFiles() throws Exception { .containsExactlyInAnyOrder(6, 8); // assert scan one bucket - Optional file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1, 0); + Optional file = indexFileHandler.scanHashIndex(snapshot.id(), part1, 0); assertThat(file).isPresent(); assertThat(indexFileHandler.readHashIndexList(file.get())).containsExactlyInAnyOrder(1, 4); @@ -766,9 +766,9 @@ public void testIndexFiles() throws Exception { store.overwriteData( Collections.singletonList(record1), gen::getPartition, kv -> 0, new HashMap<>()); snapshot = store.snapshotManager().latestSnapshot(); - file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1, 0); + file = indexFileHandler.scanHashIndex(snapshot.id(), part1, 0); assertThat(file).isEmpty(); - file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2, 2); + file = indexFileHandler.scanHashIndex(snapshot.id(), part2, 2); assertThat(file).isPresent(); // overwrite all partitions @@ -776,7 +776,7 @@ public void testIndexFiles() throws Exception { store.overwriteData( Collections.singletonList(record1), gen::getPartition, kv -> 0, new HashMap<>()); snapshot = store.snapshotManager().latestSnapshot(); - file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2, 2); + file = indexFileHandler.scanHashIndex(snapshot.id(), part2, 2); assertThat(file).isEmpty(); } From b60a4df152574ccd538fa02b54b43957e18d8464 Mon Sep 17 00:00:00 2001 From: Yann Date: Wed, 22 May 2024 17:04:42 +0800 Subject: [PATCH 2/7] [followup] UT --- .../DeletionVectorsMaintainer.java | 16 -- .../apache/paimon/index/IndexFileHandler.java | 13 -- .../apache/paimon/io/CompactIncrement.java | 6 + .../manifest/IndexManifestFileHandler.java | 2 +- .../apache/paimon/TestAppendFileStore.java | 169 ++++++++++++++++++ .../DeletionVectorsMaintainerTest.java | 59 ++++++ .../index/IndexFileMetaSerializerTest.java | 39 ++-- .../IndexManifestFileHandlerTest.java | 111 ++++++++++++ .../paimon/operation/FileStoreCommitTest.java | 53 ++++++ .../sink/CommitMessageSerializerTest.java | 4 +- 10 files changed, 426 insertions(+), 46 deletions(-) create mode 100644 paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java index 461e092e0396..ce9ad40a1f04 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java @@ -22,7 +22,6 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; -import org.apache.paimon.manifest.IndexManifestEntry; import javax.annotation.Nullable; @@ -31,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; @@ -140,20 +138,6 @@ public DeletionVectorsMaintainer createOrRestore( return createOrRestore(deletionVectors); } - @VisibleForTesting - public DeletionVectorsMaintainer createOrRestore( - @Nullable Long snapshotId, BinaryRow partition) { - List indexFiles = - snapshotId == null - ? Collections.emptyList() - : handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition).stream() - .map(IndexManifestEntry::indexFile) - .collect(Collectors.toList()); - Map deletionVectors = - new HashMap<>(handler.readAllDeletionVectors(indexFiles)); - return createOrRestore(deletionVectors); - } - public DeletionVectorsMaintainer create() { return createOrRestore(new HashMap<>()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index 40c676362c77..f3138cb02ba5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -65,19 +65,6 @@ public IndexFileHandler( this.deletionVectorsIndex = deletionVectorsIndex; } - public DeletionVectorsIndexFile deletionVectorsIndex() { - return this.deletionVectorsIndex; - } - - public List scan() { - Snapshot snapshot = snapshotManager.latestSnapshot(); - String indexManifest = snapshot.indexManifest(); - if (indexManifest == null) { - return Collections.emptyList(); - } - return indexManifestFile.read(indexManifest); - } - public Optional scanHashIndex(long snapshotId, BinaryRow partition, int bucket) { List result = scan(snapshotId, HASH_INDEX, partition, bucket); if (result.size() > 1) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java index cc3e79f01bf2..23f64b5cd2f1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java @@ -18,6 +18,7 @@ package org.apache.paimon.io; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -88,4 +89,9 @@ public String toString() { .map(DataFileMeta::fileName) .collect(Collectors.joining(",\n"))); } + + public static CompactIncrement emptyIncrement() { + return new CompactIncrement( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java index d7a952f239b2..795b1e3aaf52 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java @@ -33,7 +33,7 @@ import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; -/** IndexManifestFile Handler */ +/** IndexManifestFile Handler. */ public class IndexManifestFileHandler { private final IndexManifestFile indexManifestFile; diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java new file mode 100644 index 000000000000..23280a2276b6 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileIOFinder; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.operation.FileStoreCommitImpl; +import org.apache.paimon.operation.Lock; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.TraceableFileIO; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; + +public class TestAppendFileStore extends AppendOnlyFileStore { + + private final String commitUser; + + private final IndexFileHandler fileHandler; + + private long commitIdentifier; + + private FileIO fileIO; + + public TestAppendFileStore( + FileIO fileIO, + SchemaManager schemaManage, + CoreOptions options, + TableSchema tableSchema, + RowType partitionType, + RowType bucketType, + RowType rowType, + String tableName) { + super( + fileIO, + schemaManage, + tableSchema, + options, + partitionType, + bucketType, + rowType, + tableName, + new CatalogEnvironment(Lock.emptyFactory(), null, null)); + + this.fileIO = fileIO; + this.commitUser = UUID.randomUUID().toString(); + this.fileHandler = this.newIndexFileHandler(); + this.commitIdentifier = 0L; + } + + public FileIO fileIO() { + return this.fileIO; + } + + public FileStoreCommitImpl newCommit() { + return super.newCommit(commitUser); + } + + public void commit(CommitMessage... commitMessages) { + ManifestCommittable committable = new ManifestCommittable(commitIdentifier++); + for (CommitMessage commitMessage : commitMessages) { + committable.addFileCommittable(commitMessage); + } + newCommit().commit(committable, Collections.emptyMap()); + } + + public CommitMessage removeIndexFiles( + BinaryRow partition, int bucket, List indexFileMetas) { + return new CommitMessageImpl( + partition, + bucket, + DataIncrement.emptyIncrement(), + CompactIncrement.emptyIncrement(), + new IndexIncrement(Collections.emptyList(), indexFileMetas)); + } + + public List scanDVIndexFiles(BinaryRow partition, int bucket) { + Long lastSnapshotId = snapshotManager().latestSnapshotId(); + return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX, partition, bucket); + } + + public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow partition, int bucket) { + Long lastSnapshotId = snapshotManager().latestSnapshotId(); + DeletionVectorsMaintainer.Factory factory = + new DeletionVectorsMaintainer.Factory(fileHandler); + return factory.createOrRestore(lastSnapshotId, partition, bucket); + } + + public CommitMessage writeDVIndexFiles( + BinaryRow partition, int bucket, Map> dataFileToPositions) { + DeletionVectorsMaintainer dvMaintainer = createOrRestoreDVMaintainer(partition, bucket); + for (Map.Entry> entry : dataFileToPositions.entrySet()) { + for (Integer pos : entry.getValue()) { + dvMaintainer.notifyNewDeletion(entry.getKey(), pos); + } + } + return new CommitMessageImpl( + partition, + bucket, + DataIncrement.emptyIncrement(), + CompactIncrement.emptyIncrement(), + new IndexIncrement(dvMaintainer.prepareCommit())); + } + + public static TestAppendFileStore createAppendStore( + java.nio.file.Path tempDir, Map options) throws Exception { + String root = TraceableFileIO.SCHEME + "://" + tempDir.toString(); + Path path = new Path(tempDir.toUri()); + FileIO fileIO = FileIOFinder.find(new Path(root)); + SchemaManager schemaManage = new SchemaManager(new LocalFileIO(), path); + + options.put(CoreOptions.PATH.key(), root); + TableSchema tableSchema = + SchemaUtils.forceCommit( + schemaManage, + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + Collections.emptyList(), + options, + null)); + return new TestAppendFileStore( + fileIO, + schemaManage, + new CoreOptions(options), + tableSchema, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + RowType.of(), + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + (new Path(root)).getName()); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java index 5a69541bb9a6..dd6d3102c899 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java @@ -22,10 +22,17 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -63,4 +70,56 @@ public void test0() { assertThat(deletionVectors.get("f2").isDeleted(2)).isTrue(); assertThat(deletionVectors.containsKey("f3")).isFalse(); } + + @Test + public void test1() { + DeletionVectorsMaintainer.Factory factory = + new DeletionVectorsMaintainer.Factory(fileHandler); + + DeletionVectorsMaintainer dvMaintainer = factory.create(); + BitmapDeletionVector deletionVector1 = new BitmapDeletionVector(); + deletionVector1.delete(1); + deletionVector1.delete(3); + deletionVector1.delete(5); + dvMaintainer.notifyNewDeletion("f1", deletionVector1); + + List fileMetas1 = dvMaintainer.prepareCommit(); + assertThat(fileMetas1.size()).isEqualTo(1); + CommitMessage commitMessage = + new CommitMessageImpl( + BinaryRow.EMPTY_ROW, + 0, + DataIncrement.emptyIncrement(), + CompactIncrement.emptyIncrement(), + new IndexIncrement(fileMetas1)); + BatchTableCommit commit = table.newBatchWriteBuilder().newCommit(); + commit.commit(Collections.singletonList(commitMessage)); + + Long lastSnapshotId = table.snapshotManager().latestSnapshotId(); + dvMaintainer = factory.createOrRestore(lastSnapshotId, BinaryRow.EMPTY_ROW, 0); + DeletionVector deletionVector2 = dvMaintainer.deletionVectorOf("f1").get(); + assertThat(deletionVector2.isDeleted(1)).isTrue(); + assertThat(deletionVector2.isDeleted(2)).isFalse(); + + deletionVector2.delete(2); + dvMaintainer.notifyNewDeletion("f1", deletionVector2); + + List fileMetas2 = dvMaintainer.prepareCommit(); + assertThat(fileMetas2.size()).isEqualTo(1); + commitMessage = + new CommitMessageImpl( + BinaryRow.EMPTY_ROW, + 0, + DataIncrement.emptyIncrement(), + CompactIncrement.emptyIncrement(), + new IndexIncrement(fileMetas2)); + commit = table.newBatchWriteBuilder().newCommit(); + commit.commit(Collections.singletonList(commitMessage)); + + lastSnapshotId = table.snapshotManager().latestSnapshotId(); + dvMaintainer = factory.createOrRestore(lastSnapshotId, BinaryRow.EMPTY_ROW, 0); + DeletionVector deletionVector3 = dvMaintainer.deletionVectorOf("f1").get(); + assertThat(deletionVector3.isDeleted(1)).isTrue(); + assertThat(deletionVector3.isDeleted(2)).isTrue(); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java index 53058a05f93c..724d5b416359 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java @@ -42,22 +42,31 @@ protected IndexFileMeta object() { public static IndexFileMeta randomIndexFile() { Random rnd = new Random(); if (rnd.nextBoolean()) { - return new IndexFileMeta( - HashIndexFile.HASH_INDEX, - "my_file_name" + rnd.nextLong(), - rnd.nextInt(), - rnd.nextInt()); + return randomHashIndexFile(); } else { - LinkedHashMap> deletionVectorsRanges = - new LinkedHashMap<>(); - deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(), rnd.nextInt())); - deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(), rnd.nextInt())); - return new IndexFileMeta( - DeletionVectorsIndexFile.DELETION_VECTORS_INDEX, - "deletion_vectors_index_file_name" + rnd.nextLong(), - rnd.nextInt(), - rnd.nextInt(), - deletionVectorsRanges); + return randomDeletionVectorIndexFile(); } } + + public static IndexFileMeta randomHashIndexFile() { + Random rnd = new Random(); + return new IndexFileMeta( + HashIndexFile.HASH_INDEX, + "my_file_name" + rnd.nextLong(), + rnd.nextInt(), + rnd.nextInt()); + } + + public static IndexFileMeta randomDeletionVectorIndexFile() { + Random rnd = new Random(); + LinkedHashMap> deletionVectorsRanges = new LinkedHashMap<>(); + deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(), rnd.nextInt())); + deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(), rnd.nextInt())); + return new IndexFileMeta( + DeletionVectorsIndexFile.DELETION_VECTORS_INDEX, + "deletion_vectors_index_file_name" + rnd.nextLong(), + rnd.nextInt(), + rnd.nextInt(), + deletionVectorsRanges); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java new file mode 100644 index 000000000000..67f7cc28eec4 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +import org.apache.paimon.TestAppendFileStore; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.table.BucketMode; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import static org.apache.paimon.index.IndexFileMetaSerializerTest.randomDeletionVectorIndexFile; +import static org.assertj.core.api.Assertions.assertThat; + +public class IndexManifestFileHandlerTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testUnawareMode() throws Exception { + TestAppendFileStore fileStore = + TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); + + IndexManifestFile indexManifestFile = + new IndexManifestFile.Factory( + fileStore.fileIO(), + fileStore.options().manifestFormat(), + fileStore.pathFactory()) + .create(); + IndexManifestFileHandler indexManifestFileHandler = + new IndexManifestFileHandler(indexManifestFile, BucketMode.BUCKET_UNAWARE); + + IndexManifestEntry entry1 = + new IndexManifestEntry( + FileKind.ADD, BinaryRow.EMPTY_ROW, 0, randomDeletionVectorIndexFile()); + String indexManifestFile1 = indexManifestFileHandler.write(null, Arrays.asList(entry1)); + + IndexManifestEntry entry2 = entry1.toDeleteEntry(); + IndexManifestEntry entry3 = + new IndexManifestEntry( + FileKind.ADD, BinaryRow.EMPTY_ROW, 0, randomDeletionVectorIndexFile()); + String indexManifestFile2 = + indexManifestFileHandler.write(indexManifestFile1, Arrays.asList(entry2, entry3)); + + List entries = indexManifestFile.read(indexManifestFile2); + assertThat(entries.size()).isEqualTo(1); + assertThat(entries.contains(entry1)).isFalse(); + assertThat(entries.contains(entry2)).isFalse(); + assertThat(entries.contains(entry3)).isTrue(); + } + + @Test + public void testHashFixedBucket() throws Exception { + TestAppendFileStore fileStore = + TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); + + IndexManifestFile indexManifestFile = + new IndexManifestFile.Factory( + fileStore.fileIO(), + fileStore.options().manifestFormat(), + fileStore.pathFactory()) + .create(); + IndexManifestFileHandler indexManifestFileHandler = + new IndexManifestFileHandler(indexManifestFile, BucketMode.HASH_FIXED); + + IndexManifestEntry entry1 = + new IndexManifestEntry( + FileKind.ADD, BinaryRow.EMPTY_ROW, 0, randomDeletionVectorIndexFile()); + IndexManifestEntry entry2 = + new IndexManifestEntry( + FileKind.ADD, BinaryRow.EMPTY_ROW, 1, randomDeletionVectorIndexFile()); + String indexManifestFile1 = + indexManifestFileHandler.write(null, Arrays.asList(entry1, entry2)); + + IndexManifestEntry entry3 = + new IndexManifestEntry( + FileKind.ADD, BinaryRow.EMPTY_ROW, 1, randomDeletionVectorIndexFile()); + IndexManifestEntry entry4 = + new IndexManifestEntry( + FileKind.ADD, BinaryRow.EMPTY_ROW, 2, randomDeletionVectorIndexFile()); + String indexManifestFile2 = + indexManifestFileHandler.write(indexManifestFile1, Arrays.asList(entry3, entry4)); + + List entries = indexManifestFile.read(indexManifestFile2); + assertThat(entries.size()).isEqualTo(3); + assertThat(entries.contains(entry1)).isTrue(); + assertThat(entries.contains(entry2)).isFalse(); + assertThat(entries.contains(entry3)).isTrue(); + assertThat(entries.contains(entry4)).isTrue(); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index d51456cb0d81..6437c06fed33 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -21,9 +21,12 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.Snapshot; +import org.apache.paimon.TestAppendFileStore; import org.apache.paimon.TestFileStore; import org.apache.paimon.TestKeyValueGenerator; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.index.IndexFileHandler; @@ -39,6 +42,8 @@ import org.apache.paimon.stats.ColStats; import org.apache.paimon.stats.Statistics; import org.apache.paimon.stats.StatsFileHandler; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; @@ -57,6 +62,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -843,6 +849,53 @@ public void testWriteStats() throws Exception { assertThat(readStats.get()).isEqualTo(fakeStats); } + @Test + public void testDVIndexFiles() throws Exception { + TestAppendFileStore store = + TestAppendFileStore.createAppendStore(tempDir, Collections.emptyMap()); + + // commit 1 + CommitMessage commitMessage1 = + store.writeDVIndexFiles( + BinaryRow.EMPTY_ROW, + 0, + Collections.singletonMap("f1", Arrays.asList(1, 3))); + CommitMessage commitMessage2 = + store.writeDVIndexFiles( + BinaryRow.EMPTY_ROW, + 0, + Collections.singletonMap("f2", Arrays.asList(2, 4))); + store.commit(commitMessage1, commitMessage2); + + // assert 1 + assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(2); + DeletionVectorsMaintainer maintainer = + store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0); + Map dvs = maintainer.deletionVectors(); + assertThat(dvs.size()).isEqualTo(2); + assertThat(dvs.get("f2").isDeleted(2)).isTrue(); + assertThat(dvs.get("f2").isDeleted(3)).isFalse(); + assertThat(dvs.get("f2").isDeleted(4)).isTrue(); + + // commit 2 + CommitMessage commitMessage3 = + store.writeDVIndexFiles( + BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2", Arrays.asList(3))); + CommitMessage commitMessage4 = + store.removeIndexFiles( + BinaryRow.EMPTY_ROW, + 0, + ((CommitMessageImpl) commitMessage1).indexIncrement().newIndexFiles()); + store.commit(commitMessage3, commitMessage4); + + // assert 2 + assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 0).size()).isEqualTo(2); + maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0); + dvs = maintainer.deletionVectors(); + assertThat(dvs.size()).isEqualTo(2); + assertThat(dvs.get("f2").isDeleted(3)).isTrue(); + } + private TestFileStore createStore(boolean failing) throws Exception { return createStore(failing, 1); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java index 190027d361d3..47a2107426d1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java @@ -42,7 +42,9 @@ public void test() throws IOException { DataIncrement dataIncrement = randomNewFilesIncrement(); CompactIncrement compactIncrement = randomCompactIncrement(); IndexIncrement indexIncrement = - new IndexIncrement(Arrays.asList(randomIndexFile(), randomIndexFile())); + new IndexIncrement( + Arrays.asList(randomIndexFile(), randomIndexFile()), + Arrays.asList(randomIndexFile(), randomIndexFile())); CommitMessageImpl committable = new CommitMessageImpl(row(0), 1, dataIncrement, compactIncrement, indexIncrement); CommitMessageImpl newCommittable = From e2580910bd9bc981d89ada6bcc9341ad1561a0de Mon Sep 17 00:00:00 2001 From: Yann Date: Wed, 22 May 2024 17:15:11 +0800 Subject: [PATCH 3/7] [followup] ut --- .../src/test/java/org/apache/paimon/TestAppendFileStore.java | 1 + .../org/apache/paimon/manifest/IndexManifestFileHandlerTest.java | 1 + 2 files changed, 2 insertions(+) diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index 23280a2276b6..11612ac1bb48 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -49,6 +49,7 @@ import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; +/** Wrapper of AppendOnlyFileStore. */ public class TestAppendFileStore extends AppendOnlyFileStore { private final String commitUser; diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java index 67f7cc28eec4..4ca59a75c091 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java @@ -32,6 +32,7 @@ import static org.apache.paimon.index.IndexFileMetaSerializerTest.randomDeletionVectorIndexFile; import static org.assertj.core.api.Assertions.assertThat; +/** Test for IndexManifestFileHandler. */ public class IndexManifestFileHandlerTest { @TempDir java.nio.file.Path tempDir; From 42e59ea061b3447a9f6c0cf67aec0bde5cd6963d Mon Sep 17 00:00:00 2001 From: Yann Date: Wed, 22 May 2024 17:16:34 +0800 Subject: [PATCH 4/7] [followup] UT --- .../org/apache/paimon/manifest/IndexManifestFileHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java index 795b1e3aaf52..629e856f5430 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java @@ -125,7 +125,7 @@ public List combine( } } - /** We combine the previous and new index files by {@link IndexManifestEntry#identifier} */ + /** We combine the previous and new index files by {@link IndexManifestEntry#identifier}. */ static class CommonBucketCombiner implements IndexManifestFileCombiner { @Override From f68cea80f0b53ea7752ba12ac4e9d46a761d38e5 Mon Sep 17 00:00:00 2001 From: Yann Date: Wed, 22 May 2024 17:36:42 +0800 Subject: [PATCH 5/7] [followup] UT --- .../java/org/apache/paimon/operation/FileStoreCommitTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 6437c06fed33..d7ad6924a917 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -852,7 +852,7 @@ public void testWriteStats() throws Exception { @Test public void testDVIndexFiles() throws Exception { TestAppendFileStore store = - TestAppendFileStore.createAppendStore(tempDir, Collections.emptyMap()); + TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); // commit 1 CommitMessage commitMessage1 = From 86e55b580ccdb0895bc37fd4078bd93d06fa4a72 Mon Sep 17 00:00:00 2001 From: Yann Date: Wed, 22 May 2024 17:44:20 +0800 Subject: [PATCH 6/7] [followup] UT --- .../java/org/apache/paimon/operation/FileStoreCommitTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index d7ad6924a917..03630e71fd2f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -851,8 +851,7 @@ public void testWriteStats() throws Exception { @Test public void testDVIndexFiles() throws Exception { - TestAppendFileStore store = - TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); + TestAppendFileStore store = TestAppendFileStore.createAppendStore(tempDir, new HashMap<>()); // commit 1 CommitMessage commitMessage1 = From 9f723847af451d00d0e079253612a9b2f4a4f9ac Mon Sep 17 00:00:00 2001 From: Yann Date: Wed, 22 May 2024 19:55:01 +0800 Subject: [PATCH 7/7] [followup] comments --- .../DeletionVectorsIndexFile.java | 5 - .../paimon/manifest/IndexManifestEntry.java | 55 ----------- .../manifest/IndexManifestFileHandler.java | 93 +++++++++++++++---- 3 files changed, 77 insertions(+), 76 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java index a6283ba97ec4..c0cc090398e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -57,10 +56,6 @@ public DeletionVectorsIndexFile(FileIO fileIO, PathFactory pathFactory) { public Map readAllDeletionVectors(IndexFileMeta fileMeta) { LinkedHashMap> deletionVectorRanges = fileMeta.deletionVectorsRanges(); - if (deletionVectorRanges == null || deletionVectorRanges.isEmpty()) { - return Collections.emptyMap(); - } - String indexFileName = fileMeta.fileName(); Map deletionVectors = new HashMap<>(); Path filePath = pathFactory.toPath(indexFileName); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java index 53f3fb82de8c..fc4fd03e4335 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java @@ -94,10 +94,6 @@ public static RowType schema() { return new RowType(fields); } - public Identifier identifier() { - return new Identifier(partition, bucket, indexFile.indexType()); - } - @Override public boolean equals(Object o) { if (this == o) { @@ -131,55 +127,4 @@ public String toString() { + indexFile + '}'; } - - /** The {@link Identifier} of a {@link IndexFileMeta}. */ - public static class Identifier { - - public final BinaryRow partition; - public final int bucket; - public final String indexType; - - private Integer hash; - - private Identifier(BinaryRow partition, int bucket, String indexType) { - this.partition = partition; - this.bucket = bucket; - this.indexType = indexType; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Identifier that = (Identifier) o; - return bucket == that.bucket - && Objects.equals(partition, that.partition) - && Objects.equals(indexType, that.indexType); - } - - @Override - public int hashCode() { - if (hash == null) { - hash = Objects.hash(partition, bucket, indexType); - } - return hash; - } - - @Override - public String toString() { - return "Identifier{" - + "partition=" - + partition - + ", bucket=" - + bucket - + ", indexType='" - + indexType - + '\'' - + '}'; - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java index 629e856f5430..2c62e3ef41bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java @@ -18,17 +18,19 @@ package org.apache.paimon.manifest; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.table.BucketMode; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Preconditions; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; +import java.util.Objects; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; @@ -50,6 +52,10 @@ String write(@Nullable String previousIndexManifest, List ne previousIndexManifest == null ? new ArrayList<>() : indexManifestFile.read(previousIndexManifest); + for (IndexManifestEntry entry : entries) { + Preconditions.checkArgument(entry.kind() == FileKind.ADD); + } + Pair, List> previous = separateIndexEntries(entries); Pair, List> current = @@ -107,11 +113,9 @@ static class UnawareBucketCombiner implements IndexManifestFileCombiner { @Override public List combine( List prevIndexFiles, List newIndexFiles) { - Map indexEntries = new LinkedHashMap<>(); + Map indexEntries = new HashMap<>(); for (IndexManifestEntry entry : prevIndexFiles) { - if (entry.kind() == FileKind.ADD) { - indexEntries.put(entry.indexFile().fileName(), entry); - } + indexEntries.put(entry.indexFile().fileName(), entry); } for (IndexManifestEntry entry : newIndexFiles) { @@ -125,26 +129,83 @@ public List combine( } } - /** We combine the previous and new index files by {@link IndexManifestEntry#identifier}. */ + /** We combine the previous and new index files by {@link Identifier}. */ static class CommonBucketCombiner implements IndexManifestFileCombiner { @Override public List combine( List prevIndexFiles, List newIndexFiles) { - Map indexEntries = - prevIndexFiles.stream() - .filter(entry -> entry.kind() == FileKind.ADD) - .collect( - Collectors.toMap( - IndexManifestEntry::identifier, Function.identity())); + Map indexEntries = new HashMap<>(); + for (IndexManifestEntry entry : prevIndexFiles) { + indexEntries.put(identifier(entry), entry); + } + for (IndexManifestEntry entry : newIndexFiles) { if (entry.kind() == FileKind.ADD) { - indexEntries.put(entry.identifier(), entry); + indexEntries.put(identifier(entry), entry); } else { - indexEntries.remove(entry.identifier()); + indexEntries.remove(identifier(entry)); } } return new ArrayList<>(indexEntries.values()); } } + + private static Identifier identifier(IndexManifestEntry indexManifestEntry) { + return new Identifier( + indexManifestEntry.partition(), + indexManifestEntry.bucket(), + indexManifestEntry.indexFile().indexType()); + } + + /** The {@link Identifier} of a {@link IndexFileMeta}. */ + public static class Identifier { + + public final BinaryRow partition; + public final int bucket; + public final String indexType; + + private Integer hash; + + private Identifier(BinaryRow partition, int bucket, String indexType) { + this.partition = partition; + this.bucket = bucket; + this.indexType = indexType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Identifier that = (Identifier) o; + return bucket == that.bucket + && Objects.equals(partition, that.partition) + && Objects.equals(indexType, that.indexType); + } + + @Override + public int hashCode() { + if (hash == null) { + hash = Objects.hash(partition, bucket, indexType); + } + return hash; + } + + @Override + public String toString() { + return "Identifier{" + + "partition=" + + partition + + ", bucket=" + + bucket + + ", indexType='" + + indexType + + '\'' + + '}'; + } + } }