Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ public FileStoreCommitImpl newCommit(String commitUser) {
partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(),
newKeyComparator(),
options.branch(),
newStatsFileHandler());
newStatsFileHandler(),
bucketMode());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.index.IndexFile;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;

Expand All @@ -48,18 +49,16 @@ public DeletionVectorsIndexFile(FileIO fileIO, PathFactory pathFactory) {
/**
* Reads all deletion vectors from a specified file.
*
* @param fileName The name of the file from which to read the deletion vectors.
* @param deletionVectorRanges A map where the key represents which file the DeletionVector
* belongs to and the value is a Pair object specifying the range (start position and size)
* within the file where the deletion vector data is located.
* @return A map where the key represents which file the DeletionVector belongs to, and the
* value is the corresponding DeletionVector object.
* @throws UncheckedIOException If an I/O error occurs while reading from the file.
*/
public Map<String, DeletionVector> readAllDeletionVectors(
String fileName, LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges) {
public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta fileMeta) {
LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
fileMeta.deletionVectorsRanges();
String indexFileName = fileMeta.fileName();
Map<String, DeletionVector> deletionVectors = new HashMap<>();
Path filePath = pathFactory.toPath(fileName);
Path filePath = pathFactory.toPath(indexFileName);
try (SeekableInputStream inputStream = fileIO.newInputStream(filePath)) {
checkVersion(inputStream);
DataInputStream dataInputStream = new DataInputStream(inputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,12 @@ public Factory(IndexFileHandler handler) {

public DeletionVectorsMaintainer createOrRestore(
@Nullable Long snapshotId, BinaryRow partition, int bucket) {
IndexFileMeta indexFile =
List<IndexFileMeta> indexFiles =
snapshotId == null
? null
: handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket)
.orElse(null);
? Collections.emptyList()
: handler.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket);
Map<String, DeletionVector> deletionVectors =
indexFile == null
? new HashMap<>()
: new HashMap<>(handler.readAllDeletionVectors(indexFile));
new HashMap<>(handler.readAllDeletionVectors(indexFiles));
return createOrRestore(deletionVectors);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private HashIndexMaintainer(
IntHashSet hashcode = new IntHashSet();
if (snapshotId != null) {
Optional<IndexFileMeta> indexFile =
fileHandler.scan(snapshotId, HashIndexFile.HASH_INDEX, partition, bucket);
fileHandler.scanHashIndex(snapshotId, partition, bucket);
if (indexFile.isPresent()) {
IndexFileMeta file = indexFile.get();
hashcode = new IntHashSet((int) file.rowCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -64,20 +65,25 @@ public IndexFileHandler(
this.deletionVectorsIndex = deletionVectorsIndex;
}

public Optional<IndexFileMeta> scan(
public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow partition, int bucket) {
List<IndexFileMeta> result = scan(snapshotId, HASH_INDEX, partition, bucket);
if (result.size() > 1) {
throw new IllegalArgumentException(
"Find multiple hash index files for one bucket: " + result);
}
return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0));
}

public List<IndexFileMeta> scan(
long snapshotId, String indexType, BinaryRow partition, int bucket) {
List<IndexManifestEntry> entries = scan(snapshotId, indexType, partition);
List<IndexManifestEntry> result = new ArrayList<>();
List<IndexFileMeta> result = new ArrayList<>();
for (IndexManifestEntry file : entries) {
if (file.bucket() == bucket) {
result.add(file);
result.add(file.indexFile());
}
}
if (result.size() > 1) {
throw new IllegalArgumentException(
"Find multiple index files for one bucket: " + result);
}
return result.isEmpty() ? Optional.empty() : Optional.of(result.get(0).indexFile());
return result;
}

public List<IndexManifestEntry> scan(String indexType, BinaryRow partition) {
Expand Down Expand Up @@ -167,31 +173,16 @@ public void deleteManifest(String indexManifest) {
indexManifestFile.delete(indexManifest);
}

public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta fileMeta) {
if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) {
throw new IllegalArgumentException(
"Input file is not deletion vectors index " + fileMeta.indexType());
}
LinkedHashMap<String, Pair<Integer, Integer>> deleteIndexRange =
fileMeta.deletionVectorsRanges();
if (deleteIndexRange == null || deleteIndexRange.isEmpty()) {
return Collections.emptyMap();
}
return deletionVectorsIndex.readAllDeletionVectors(fileMeta.fileName(), deleteIndexRange);
}

public Optional<DeletionVector> readDeletionVector(IndexFileMeta fileMeta, String fileName) {
if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) {
throw new IllegalArgumentException(
"Input file is not deletion vectors index " + fileMeta.indexType());
}
Map<String, Pair<Integer, Integer>> deleteIndexRange = fileMeta.deletionVectorsRanges();
if (deleteIndexRange == null || !deleteIndexRange.containsKey(fileName)) {
return Optional.empty();
public Map<String, DeletionVector> readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
Map<String, DeletionVector> deletionVectors = new HashMap<>();
for (IndexFileMeta indexFile : fileMetas) {
if (!indexFile.indexType().equals(DELETION_VECTORS_INDEX)) {
throw new IllegalArgumentException(
"Input file is not deletion vectors index " + indexFile.indexType());
}
deletionVectors.putAll(deletionVectorsIndex.readAllDeletionVectors(indexFile));
}
return Optional.of(
deletionVectorsIndex.readDeletionVector(
fileMeta.fileName(), deleteIndexRange.get(fileName)));
return deletionVectors;
}

public IndexFileMeta writeDeletionVectorsIndex(Map<String, DeletionVector> deletionVectors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.io;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -88,4 +89,9 @@ public String toString() {
.map(DataFileMeta::fileName)
.collect(Collectors.joining(",\n")));
}

public static CompactIncrement emptyIncrement() {
return new CompactIncrement(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
}
}
31 changes: 27 additions & 4 deletions paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.index.IndexFileMeta;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand All @@ -28,16 +30,29 @@ public class IndexIncrement {

private final List<IndexFileMeta> newIndexFiles;

private final List<IndexFileMeta> deletedIndexFiles;

public IndexIncrement(List<IndexFileMeta> newIndexFiles) {
this.newIndexFiles = newIndexFiles;
this.deletedIndexFiles = Collections.emptyList();
}

public IndexIncrement(
List<IndexFileMeta> newIndexFiles, List<IndexFileMeta> deletedIndexFiles) {
this.newIndexFiles = newIndexFiles;
this.deletedIndexFiles = deletedIndexFiles;
}

public List<IndexFileMeta> newIndexFiles() {
return newIndexFiles;
}

public List<IndexFileMeta> deletedIndexFiles() {
return deletedIndexFiles;
}

public boolean isEmpty() {
return newIndexFiles.isEmpty();
return newIndexFiles.isEmpty() && deletedIndexFiles.isEmpty();
}

@Override
Expand All @@ -49,16 +64,24 @@ public boolean equals(Object o) {
return false;
}
IndexIncrement that = (IndexIncrement) o;
return Objects.equals(newIndexFiles, that.newIndexFiles);
return Objects.equals(newIndexFiles, that.newIndexFiles)
&& Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
}

@Override
public int hashCode() {
return Objects.hash(newIndexFiles);
List<IndexFileMeta> all = new ArrayList<>(newIndexFiles);
all.addAll(deletedIndexFiles);
return Objects.hash(all);
}

@Override
public String toString() {
return "IndexIncrement{" + "newIndexFiles=" + newIndexFiles + '}';
return "IndexIncrement{"
+ "newIndexFiles="
+ newIndexFiles
+ ",deletedIndexFiles="
+ deletedIndexFiles
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ public static RowType schema() {
return new RowType(fields);
}

public Identifier identifier() {
return new Identifier(partition, bucket, indexFile.indexType());
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down Expand Up @@ -131,55 +127,4 @@ public String toString() {
+ indexFile
+ '}';
}

/** The {@link Identifier} of a {@link IndexFileMeta}. */
public static class Identifier {

public final BinaryRow partition;
public final int bucket;
public final String indexType;

private Integer hash;

private Identifier(BinaryRow partition, int bucket, String indexType) {
this.partition = partition;
this.bucket = bucket;
this.indexType = indexType;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Identifier that = (Identifier) o;
return bucket == that.bucket
&& Objects.equals(partition, that.partition)
&& Objects.equals(indexType, that.indexType);
}

@Override
public int hashCode() {
if (hash == null) {
hash = Objects.hash(partition, bucket, indexType);
}
return hash;
}

@Override
public String toString() {
return "Identifier{"
+ "partition="
+ partition
+ ", bucket="
+ bucket
+ ", indexType='"
+ indexType
+ '\''
+ '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.IndexManifestEntry.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectsFile;
Expand All @@ -31,10 +31,7 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/** Index manifest file. */
public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {
Expand All @@ -53,27 +50,17 @@ private IndexManifestFile(
null);
}

/** Merge new index files to index manifest. */
/** Write new index files to index manifest. */
@Nullable
public String merge(
@Nullable String previousIndexManifest, List<IndexManifestEntry> newIndexFiles) {
String indexManifest = previousIndexManifest;
if (newIndexFiles.size() > 0) {
Map<Identifier, IndexManifestEntry> indexEntries = new LinkedHashMap<>();
List<IndexManifestEntry> entries =
indexManifest == null ? new ArrayList<>() : read(indexManifest);
entries.addAll(newIndexFiles);
for (IndexManifestEntry file : entries) {
if (file.kind() == FileKind.ADD) {
indexEntries.put(file.identifier(), file);
} else {
indexEntries.remove(file.identifier());
}
}
indexManifest = writeWithoutRolling(indexEntries.values());
public String writeIndexFiles(
@Nullable String previousIndexManifest,
List<IndexManifestEntry> newIndexFiles,
BucketMode bucketMode) {
if (newIndexFiles.isEmpty()) {
return previousIndexManifest;
}

return indexManifest;
IndexManifestFileHandler handler = new IndexManifestFileHandler(this, bucketMode);
return handler.write(previousIndexManifest, newIndexFiles);
}

/** Creator of {@link IndexManifestFile}. */
Expand Down
Loading