diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java new file mode 100644 index 000000000000..210e56ed9dff --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.fs.Path; +import org.apache.paimon.iceberg.manifest.IcebergConversions; +import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta; +import org.apache.paimon.iceberg.manifest.IcebergManifestEntry; +import org.apache.paimon.iceberg.manifest.IcebergManifestFile; +import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta; +import org.apache.paimon.iceberg.manifest.IcebergManifestList; +import org.apache.paimon.iceberg.manifest.IcebergPartitionSummary; +import org.apache.paimon.iceberg.metadata.IcebergMetadata; +import org.apache.paimon.iceberg.metadata.IcebergPartitionField; +import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec; +import org.apache.paimon.iceberg.metadata.IcebergSchema; +import org.apache.paimon.iceberg.metadata.IcebergSnapshot; +import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.options.Options; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitCallback; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.RawFile; +import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.SnapshotManager; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg readers can read + * Paimon's {@link RawFile}. + */ +public abstract class AbstractIcebergCommitCallback implements CommitCallback { + + // see org.apache.iceberg.hadoop.Util + private static final String VERSION_HINT_FILENAME = "version-hint.text"; + + protected final FileStoreTable table; + private final String commitUser; + private final IcebergPathFactory pathFactory; + private final FileStorePathFactory fileStorePathFactory; + + private final IcebergManifestFile manifestFile; + private final IcebergManifestList manifestList; + + public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) { + this.table = table; + this.commitUser = commitUser; + this.pathFactory = new IcebergPathFactory(table.location()); + this.fileStorePathFactory = table.store().pathFactory(); + + RowType partitionType = table.schema().logicalPartitionType(); + RowType entryType = IcebergManifestEntry.schema(partitionType); + Options manifestFileAvroOptions = Options.fromMap(table.options()); + // https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestReader.java + manifestFileAvroOptions.set( + "avro.row-name-mapping", + "org.apache.paimon.avro.generated.record:manifest_entry," + + "manifest_entry_data_file:r2," + + "r2_partition:r102"); + FileFormat manifestFileAvro = FileFormat.getFileFormat(manifestFileAvroOptions, "avro"); + this.manifestFile = + new IcebergManifestFile( + table.fileIO(), + partitionType, + manifestFileAvro.createReaderFactory(entryType), + manifestFileAvro.createWriterFactory(entryType), + table.coreOptions().manifestCompression(), + pathFactory.manifestFileFactory(), + table.coreOptions().manifestTargetSize()); + + Options manifestListAvroOptions = Options.fromMap(table.options()); + // https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestLists.java + manifestListAvroOptions.set( + "avro.row-name-mapping", + "org.apache.paimon.avro.generated.record:manifest_file," + + "manifest_file_partitions:r508"); + FileFormat manifestListAvro = FileFormat.getFileFormat(manifestListAvroOptions, "avro"); + this.manifestList = + new IcebergManifestList( + table.fileIO(), + manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()), + manifestListAvro.createWriterFactory(IcebergManifestFileMeta.schema()), + table.coreOptions().manifestCompression(), + pathFactory.manifestListFactory()); + } + + @Override + public void call(List committedEntries, Snapshot snapshot) { + createMetadata( + snapshot.id(), + (removedFiles, addedFiles) -> + collectFileChanges(committedEntries, removedFiles, addedFiles)); + } + + @Override + public void retry(ManifestCommittable committable) { + SnapshotManager snapshotManager = table.snapshotManager(); + long snapshotId = + snapshotManager + .findSnapshotsForIdentifiers( + commitUser, Collections.singletonList(committable.identifier())) + .stream() + .mapToLong(Snapshot::id) + .max() + .orElseThrow( + () -> + new RuntimeException( + "There is no snapshot for commit user " + + commitUser + + " and identifier " + + committable.identifier() + + ". This is unexpected.")); + createMetadata( + snapshotId, + (removedFiles, addedFiles) -> + collectFileChanges(snapshotId, removedFiles, addedFiles)); + } + + private void createMetadata(long snapshotId, FileChangesCollector fileChangesCollector) { + try { + if (table.fileIO().exists(pathFactory.toMetadataPath(snapshotId))) { + return; + } + + Path baseMetadataPath = pathFactory.toMetadataPath(snapshotId - 1); + if (table.fileIO().exists(baseMetadataPath)) { + createMetadataWithBase( + fileChangesCollector, + snapshotId, + IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath)); + } else { + createMetadataWithoutBase(snapshotId); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private void createMetadataWithoutBase(long snapshotId) throws IOException { + SnapshotReader snapshotReader = table.newSnapshotReader().withSnapshot(snapshotId); + Iterator entryIterator = + snapshotReader.read().dataSplits().stream() + .filter(DataSplit::rawConvertible) + .flatMap(s -> dataSplitToManifestEntries(s, snapshotId).stream()) + .iterator(); + List manifestFileMetas = + manifestFile.rollingWrite(entryIterator, snapshotId); + String manifestListFileName = manifestList.writeWithoutRolling(manifestFileMetas); + + List partitionFields = + getPartitionFields(table.schema().logicalPartitionType()); + int schemaId = (int) table.schema().id(); + IcebergSnapshot snapshot = + new IcebergSnapshot( + snapshotId, + snapshotId, + System.currentTimeMillis(), + IcebergSnapshotSummary.APPEND, + pathFactory.toManifestListPath(manifestListFileName).toString(), + schemaId); + + String tableUuid = UUID.randomUUID().toString(); + IcebergMetadata metadata = + new IcebergMetadata( + tableUuid, + table.location().toString(), + snapshotId, + table.schema().highestFieldId(), + Collections.singletonList(new IcebergSchema(table.schema())), + schemaId, + Collections.singletonList(new IcebergPartitionSpec(partitionFields)), + partitionFields.stream() + .mapToInt(IcebergPartitionField::fieldId) + .max() + .orElse( + // not sure why, this is a result tested by hand + IcebergPartitionField.FIRST_FIELD_ID - 1), + Collections.singletonList(snapshot), + (int) snapshotId); + table.fileIO().tryToWriteAtomic(pathFactory.toMetadataPath(snapshotId), metadata.toJson()); + table.fileIO() + .overwriteFileUtf8( + new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME), + String.valueOf(snapshotId)); + } + + private List dataSplitToManifestEntries( + DataSplit dataSplit, long snapshotId) { + List result = new ArrayList<>(); + for (RawFile rawFile : dataSplit.convertToRawFiles().get()) { + IcebergDataFileMeta fileMeta = + new IcebergDataFileMeta( + IcebergDataFileMeta.Content.DATA, + rawFile.path(), + rawFile.format(), + dataSplit.partition(), + rawFile.rowCount(), + rawFile.fileSize()); + result.add( + new IcebergManifestEntry( + IcebergManifestEntry.Status.ADDED, + snapshotId, + snapshotId, + snapshotId, + fileMeta)); + } + return result; + } + + private void createMetadataWithBase( + FileChangesCollector fileChangesCollector, + long snapshotId, + IcebergMetadata baseMetadata) + throws IOException { + List baseManifestFileMetas = + manifestList.read(baseMetadata.currentSnapshot().manifestList()); + + Map removedFiles = new LinkedHashMap<>(); + Map> addedFiles = new LinkedHashMap<>(); + boolean isAddOnly = fileChangesCollector.collect(removedFiles, addedFiles); + Set modifiedPartitionsSet = new LinkedHashSet<>(removedFiles.values()); + modifiedPartitionsSet.addAll( + addedFiles.values().stream().map(Pair::getLeft).collect(Collectors.toList())); + List modifiedPartitions = new ArrayList<>(modifiedPartitionsSet); + + // Note that this check may be different from `removedFiles.isEmpty()`, + // because if a file's level is changed, it will first be removed and then added. + // In this case, if `baseMetadata` already contains this file, we should not add a + // duplicate. + List newManifestFileMetas; + IcebergSnapshotSummary snapshotSummary; + if (isAddOnly) { + // Fast case. We don't need to remove files from `baseMetadata`. We only need to append + // new metadata files. + newManifestFileMetas = new ArrayList<>(baseManifestFileMetas); + newManifestFileMetas.addAll(createNewlyAddedManifestFileMetas(addedFiles, snapshotId)); + snapshotSummary = IcebergSnapshotSummary.APPEND; + } else { + Pair, IcebergSnapshotSummary> result = + createWithDeleteManifestFileMetas( + removedFiles, + addedFiles, + modifiedPartitions, + baseManifestFileMetas, + snapshotId); + newManifestFileMetas = result.getLeft(); + snapshotSummary = result.getRight(); + } + String manifestListFileName = manifestList.writeWithoutRolling(newManifestFileMetas); + + // add new schema if needed + int schemaId = (int) table.schema().id(); + List schemas = baseMetadata.schemas(); + if (baseMetadata.currentSchemaId() != schemaId) { + schemas = new ArrayList<>(schemas); + schemas.add(new IcebergSchema(table.schema())); + } + + List snapshots = new ArrayList<>(baseMetadata.snapshots()); + snapshots.add( + new IcebergSnapshot( + snapshotId, + snapshotId, + System.currentTimeMillis(), + snapshotSummary, + pathFactory.toManifestListPath(manifestListFileName).toString(), + schemaId)); + + IcebergMetadata metadata = + new IcebergMetadata( + baseMetadata.tableUuid(), + baseMetadata.location(), + snapshotId, + table.schema().highestFieldId(), + schemas, + schemaId, + baseMetadata.partitionSpecs(), + baseMetadata.lastPartitionId(), + snapshots, + (int) snapshotId); + table.fileIO().tryToWriteAtomic(pathFactory.toMetadataPath(snapshotId), metadata.toJson()); + table.fileIO() + .overwriteFileUtf8( + new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME), + String.valueOf(snapshotId)); + } + + private interface FileChangesCollector { + boolean collect( + Map removedFiles, + Map> addedFiles) + throws IOException; + } + + private boolean collectFileChanges( + List manifestEntries, + Map removedFiles, + Map> addedFiles) { + boolean isAddOnly = true; + for (ManifestEntry entry : manifestEntries) { + String path = + fileStorePathFactory.bucketPath(entry.partition(), entry.bucket()) + + "/" + + entry.fileName(); + switch (entry.kind()) { + case ADD: + if (shouldAddFileToIceberg(entry.file())) { + removedFiles.remove(path); + addedFiles.put(path, Pair.of(entry.partition(), entry.file())); + } + break; + case DELETE: + isAddOnly = false; + addedFiles.remove(path); + removedFiles.put(path, entry.partition()); + break; + default: + throw new UnsupportedOperationException( + "Unknown ManifestEntry FileKind " + entry.kind()); + } + } + return isAddOnly; + } + + private boolean collectFileChanges( + long snapshotId, + Map removedFiles, + Map> addedFiles) { + return collectFileChanges( + table.store() + .newScan() + .withKind(ScanMode.DELTA) + .withSnapshot(snapshotId) + .plan() + .files(), + removedFiles, + addedFiles); + } + + protected abstract boolean shouldAddFileToIceberg(DataFileMeta meta); + + private List createNewlyAddedManifestFileMetas( + Map> addedFiles, long currentSnapshotId) + throws IOException { + if (addedFiles.isEmpty()) { + return Collections.emptyList(); + } + + return manifestFile.rollingWrite( + addedFiles.entrySet().stream() + .map( + e -> { + IcebergDataFileMeta fileMeta = + new IcebergDataFileMeta( + IcebergDataFileMeta.Content.DATA, + e.getKey(), + e.getValue().getRight().fileFormat(), + e.getValue().getLeft(), + e.getValue().getRight().rowCount(), + e.getValue().getRight().fileSize()); + return new IcebergManifestEntry( + IcebergManifestEntry.Status.ADDED, + currentSnapshotId, + currentSnapshotId, + currentSnapshotId, + fileMeta); + }) + .iterator(), + currentSnapshotId); + } + + private Pair, IcebergSnapshotSummary> + createWithDeleteManifestFileMetas( + Map removedFiles, + Map> addedFiles, + List modifiedPartitions, + List baseManifestFileMetas, + long currentSnapshotId) + throws IOException { + IcebergSnapshotSummary snapshotSummary = IcebergSnapshotSummary.APPEND; + List newManifestFileMetas = new ArrayList<>(); + + RowType partitionType = table.schema().logicalPartitionType(); + PartitionPredicate predicate = + PartitionPredicate.fromMultiple(partitionType, modifiedPartitions); + + for (IcebergManifestFileMeta fileMeta : baseManifestFileMetas) { + // use partition predicate to only check modified partitions + int numFields = partitionType.getFieldCount(); + GenericRow minValues = new GenericRow(numFields); + GenericRow maxValues = new GenericRow(numFields); + long[] nullCounts = new long[numFields]; + for (int i = 0; i < numFields; i++) { + IcebergPartitionSummary summary = fileMeta.partitions().get(i); + DataType fieldType = partitionType.getTypeAt(i); + minValues.setField( + i, IcebergConversions.toPaimonObject(fieldType, summary.lowerBound())); + maxValues.setField( + i, IcebergConversions.toPaimonObject(fieldType, summary.upperBound())); + // IcebergPartitionSummary only has `containsNull` field and does not have the + // exact number of nulls. + nullCounts[i] = summary.containsNull() ? 1 : 0; + } + + if (predicate == null + || predicate.test( + fileMeta.liveRowsCount(), + minValues, + maxValues, + new GenericArray(nullCounts))) { + // check if any IcebergManifestEntry in this manifest file meta is removed + List entries = + manifestFile.read(new Path(fileMeta.manifestPath()).getName()); + boolean canReuseFile = true; + for (IcebergManifestEntry entry : entries) { + if (entry.isLive()) { + String path = entry.file().filePath(); + if (addedFiles.containsKey(path)) { + // added file already exists (most probably due to level changes), + // remove it to not add a duplicate. + addedFiles.remove(path); + } else if (removedFiles.containsKey(path)) { + canReuseFile = false; + } + } + } + + if (canReuseFile) { + // nothing is removed, use this file meta again + newManifestFileMetas.add(fileMeta); + } else { + // some file is removed, rewrite this file meta + snapshotSummary = IcebergSnapshotSummary.OVERWRITE; + List newEntries = new ArrayList<>(); + for (IcebergManifestEntry entry : entries) { + if (entry.isLive()) { + newEntries.add( + new IcebergManifestEntry( + removedFiles.containsKey(entry.file().filePath()) + ? IcebergManifestEntry.Status.DELETED + : IcebergManifestEntry.Status.EXISTING, + entry.snapshotId(), + entry.sequenceNumber(), + entry.fileSequenceNumber(), + entry.file())); + } + } + newManifestFileMetas.addAll( + manifestFile.rollingWrite(newEntries.iterator(), currentSnapshotId)); + } + } + } + + newManifestFileMetas.addAll( + createNewlyAddedManifestFileMetas(addedFiles, currentSnapshotId)); + return Pair.of(newManifestFileMetas, snapshotSummary); + } + + private List getPartitionFields(RowType partitionType) { + List result = new ArrayList<>(); + int fieldId = IcebergPartitionField.FIRST_FIELD_ID; + for (DataField field : partitionType.getFields()) { + result.add(new IcebergPartitionField(field, fieldId)); + fieldId++; + } + return result; + } + + @Override + public void close() throws Exception {} +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java new file mode 100644 index 000000000000..3356107545f8 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.table.FileStoreTable; + +/** {@link AbstractIcebergCommitCallback} for append only tables. */ +public class AppendOnlyIcebergCommitCallback extends AbstractIcebergCommitCallback { + + public AppendOnlyIcebergCommitCallback(FileStoreTable table, String commitUser) { + super(table, commitUser); + } + + @Override + protected boolean shouldAddFileToIceberg(DataFileMeta meta) { + return true; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java deleted file mode 100644 index 158f05a32b98..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.iceberg; - -import org.apache.paimon.Snapshot; -import org.apache.paimon.format.FileFormat; -import org.apache.paimon.fs.Path; -import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta; -import org.apache.paimon.iceberg.manifest.IcebergManifestEntry; -import org.apache.paimon.iceberg.manifest.IcebergManifestFile; -import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta; -import org.apache.paimon.iceberg.manifest.IcebergManifestList; -import org.apache.paimon.iceberg.metadata.IcebergMetadata; -import org.apache.paimon.iceberg.metadata.IcebergPartitionField; -import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec; -import org.apache.paimon.iceberg.metadata.IcebergSchema; -import org.apache.paimon.iceberg.metadata.IcebergSnapshot; -import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary; -import org.apache.paimon.manifest.ManifestCommittable; -import org.apache.paimon.manifest.ManifestEntry; -import org.apache.paimon.options.Options; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.sink.CommitCallback; -import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.table.source.RawFile; -import org.apache.paimon.table.source.snapshot.SnapshotReader; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.SnapshotManager; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.UUID; - -/** - * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg readers can read - * Paimon's {@link RawFile}. - */ -public class IcebergCommitCallback implements CommitCallback { - - // see org.apache.iceberg.hadoop.Util - private static final String VERSION_HINT_FILENAME = "version-hint.text"; - - private final FileStoreTable table; - private final String commitUser; - private final IcebergPathFactory pathFactory; - - private final IcebergManifestFile manifestFile; - private final IcebergManifestList manifestList; - - public IcebergCommitCallback(FileStoreTable table, String commitUser) { - this.table = table; - this.commitUser = commitUser; - this.pathFactory = new IcebergPathFactory(table.location()); - - RowType partitionType = table.schema().logicalPartitionType(); - RowType entryType = IcebergManifestEntry.schema(partitionType); - Options manifestFileAvroOptions = Options.fromMap(table.options()); - // https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestReader.java - manifestFileAvroOptions.set( - "avro.row-name-mapping", - "org.apache.paimon.avro.generated.record:manifest_entry," - + "manifest_entry_data_file:r2," - + "r2_partition:r102"); - FileFormat manifestFileAvro = FileFormat.getFileFormat(manifestFileAvroOptions, "avro"); - this.manifestFile = - new IcebergManifestFile( - table.fileIO(), - partitionType, - manifestFileAvro.createReaderFactory(entryType), - manifestFileAvro.createWriterFactory(entryType), - table.coreOptions().manifestCompression(), - pathFactory.manifestFileFactory(), - table.coreOptions().manifestTargetSize()); - - Options manifestListAvroOptions = Options.fromMap(table.options()); - // https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestLists.java - manifestListAvroOptions.set( - "avro.row-name-mapping", - "org.apache.paimon.avro.generated.record:manifest_file," - + "manifest_file_partitions:r508"); - FileFormat manifestListAvro = FileFormat.getFileFormat(manifestListAvroOptions, "avro"); - this.manifestList = - new IcebergManifestList( - table.fileIO(), - manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()), - manifestListAvro.createWriterFactory(IcebergManifestFileMeta.schema()), - table.coreOptions().manifestCompression(), - pathFactory.manifestListFactory()); - } - - @Override - public void call( - List committedEntries, long identifier, @Nullable Long watermark) { - try { - commitMetadata(identifier); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public void retry(ManifestCommittable committable) { - try { - commitMetadata(committable.identifier()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - private void commitMetadata(long identifier) throws IOException { - Pair pair = getCurrentAndBaseSnapshotIds(identifier); - long currentSnapshot = pair.getLeft(); - Long baseSnapshot = pair.getRight(); - - createMetadataWithoutBase(currentSnapshot); - } - - private Pair getCurrentAndBaseSnapshotIds(long commitIdentifier) { - SnapshotManager snapshotManager = table.snapshotManager(); - List currentSnapshots = - snapshotManager.findSnapshotsForIdentifiers( - commitUser, Collections.singletonList(commitIdentifier)); - Preconditions.checkArgument( - currentSnapshots.size() == 1, - "Cannot find snapshot with user {} and identifier {}", - commitUser, - commitIdentifier); - long currentSnapshotId = currentSnapshots.get(0).id(); - - long earliest = - Preconditions.checkNotNull( - snapshotManager.earliestSnapshotId(), - "Cannot determine earliest snapshot ID. This is unexpected."); - Long baseSnapshotId = null; - for (long id = currentSnapshotId - 1; id >= earliest; id--) { - try { - Snapshot snapshot = snapshotManager.snapshot(id); - if (!snapshot.commitUser().equals(commitUser) - || snapshot.commitIdentifier() < commitIdentifier) { - baseSnapshotId = id; - break; - } - } catch (Exception ignore) { - break; - } - } - - return Pair.of(currentSnapshotId, baseSnapshotId); - } - - private void createMetadataWithoutBase(long snapshotId) throws IOException { - SnapshotReader snapshotReader = table.newSnapshotReader().withSnapshot(snapshotId); - Iterator entryIterator = - snapshotReader.read().dataSplits().stream() - .filter(DataSplit::rawConvertible) - .flatMap(s -> dataSplitToDataFileMeta(s).stream()) - .map( - m -> - new IcebergManifestEntry( - IcebergManifestEntry.Status.ADDED, - snapshotId, - snapshotId, - snapshotId, - m)) - .iterator(); - List manifestFileMetas = - manifestFile.rollingWrite(entryIterator, snapshotId); - String manifestListFileName = manifestList.writeWithoutRolling(manifestFileMetas); - - List partitionFields = - getPartitionFields(table.schema().logicalPartitionType()); - int schemaId = (int) table.schema().id(); - IcebergSnapshot snapshot = - new IcebergSnapshot( - snapshotId, - snapshotId, - System.currentTimeMillis(), - new IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_APPEND), - pathFactory.toManifestListPath(manifestListFileName).toString(), - schemaId); - - String tableUuid = UUID.randomUUID().toString(); - IcebergMetadata metadata = - new IcebergMetadata( - tableUuid, - table.location().toString(), - snapshotId, - table.schema().highestFieldId(), - Collections.singletonList(new IcebergSchema(table.schema())), - schemaId, - Collections.singletonList(new IcebergPartitionSpec(partitionFields)), - partitionFields.stream() - .mapToInt(IcebergPartitionField::fieldId) - .max() - .orElse( - // not sure why, this is a result tested by hand - IcebergPartitionField.FIRST_FIELD_ID - 1), - Collections.singletonList(snapshot), - (int) snapshotId); - table.fileIO().tryToWriteAtomic(pathFactory.toMetadataPath(snapshotId), metadata.toJson()); - table.fileIO() - .overwriteFileUtf8( - new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME), - String.valueOf(snapshotId)); - } - - private List dataSplitToDataFileMeta(DataSplit dataSplit) { - List result = new ArrayList<>(); - for (RawFile rawFile : dataSplit.convertToRawFiles().get()) { - result.add( - new IcebergDataFileMeta( - IcebergDataFileMeta.Content.DATA, - rawFile.path(), - rawFile.format(), - dataSplit.partition(), - rawFile.rowCount(), - rawFile.fileSize())); - } - return result; - } - - private List getPartitionFields(RowType partitionType) { - List result = new ArrayList<>(); - int fieldId = IcebergPartitionField.FIRST_FIELD_ID; - for (DataField field : partitionType.getFields()) { - result.add(new IcebergPartitionField(field, fieldId)); - fieldId++; - } - return result; - } - - @Override - public void close() throws Exception {} -} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java new file mode 100644 index 000000000000..12b9c3e604d3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.table.FileStoreTable; + +/** {@link AbstractIcebergCommitCallback} for primary key tables. */ +public class PrimaryKeyIcebergCommitCallback extends AbstractIcebergCommitCallback { + + public PrimaryKeyIcebergCommitCallback(FileStoreTable table, String commitUser) { + super(table, commitUser); + } + + @Override + protected boolean shouldAddFileToIceberg(DataFileMeta meta) { + int maxLevel = table.coreOptions().numLevels() - 1; + return meta.level() == maxLevel; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index fb83dd52cec1..1d9e1c3b16e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -18,13 +18,16 @@ package org.apache.paimon.iceberg.manifest; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DecimalType; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; @@ -40,6 +43,8 @@ private IcebergConversions() {} private static final ThreadLocal ENCODER = ThreadLocal.withInitial(StandardCharsets.UTF_8::newEncoder); + private static final ThreadLocal DECODER = + ThreadLocal.withInitial(StandardCharsets.UTF_8::newDecoder); public static ByteBuffer toByteBuffer(DataType type, Object value) { switch (type.getTypeRoot()) { @@ -78,4 +83,37 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { throw new UnsupportedOperationException("Cannot serialize type: " + type); } } + + public static Object toPaimonObject(DataType type, byte[] bytes) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return bytes[0] != 0; + case INTEGER: + case DATE: + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt(); + case BIGINT: + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong(); + case FLOAT: + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getFloat(); + case DOUBLE: + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getDouble(); + case CHAR: + case VARCHAR: + try { + return BinaryString.fromString( + DECODER.get().decode(ByteBuffer.wrap(bytes)).toString()); + } catch (CharacterCodingException e) { + throw new RuntimeException("Failed to decode bytes as UTF-8", e); + } + case BINARY: + case VARBINARY: + return bytes; + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + return Decimal.fromUnscaledBytes( + bytes, decimalType.getPrecision(), decimalType.getScale()); + default: + throw new UnsupportedOperationException("Cannot deserialize type: " + type); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java index 6b76fb7572a7..213ec01316b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java @@ -90,6 +90,10 @@ public Status status() { return status; } + public boolean isLive() { + return status == Status.ADDED || status == Status.EXISTING; + } + public long snapshotId() { return snapshotId; } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java index 571b249608fc..01b3b96acbf6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java @@ -157,6 +157,10 @@ public long deletedRowsCount() { return deletedRowsCount; } + public long liveRowsCount() { + return addedRowsCount + existingRowsCount; + } + public List partitions() { return partitions; } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java index 0c70331eebb0..c620c4809808 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java @@ -35,8 +35,8 @@ public class IcebergSnapshotSummary { private static final String FIELD_OPERATION = "operation"; - public static final String OPERATION_APPEND = "append"; - public static final String OPERATION_OVERWRITE = "overwrite"; + public static final IcebergSnapshotSummary APPEND = new IcebergSnapshotSummary("append"); + public static final IcebergSnapshotSummary OVERWRITE = new IcebergSnapshotSummary("overwrite"); @JsonProperty(FIELD_OPERATION) private final String operation; diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java index 27468bc2fe6b..4f7d3d554ae2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java @@ -18,6 +18,7 @@ package org.apache.paimon.metastore; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestCommittable; @@ -28,8 +29,6 @@ import org.apache.paimon.shade.guava30.com.google.common.cache.Cache; import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder; -import javax.annotation.Nullable; - import java.time.Duration; import java.util.List; @@ -52,8 +51,7 @@ public AddPartitionCommitCallback(MetastoreClient client) { } @Override - public void call( - List committedEntries, long identifier, @Nullable Long watermark) { + public void call(List committedEntries, Snapshot snapshot) { committedEntries.stream() .filter(e -> FileKind.ADD.equals(e.kind())) .map(ManifestEntry::partition) diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java index 3c477f917936..7e2e00dce3dc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java @@ -18,13 +18,12 @@ package org.apache.paimon.metastore; +import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.tag.TagPreview; -import javax.annotation.Nullable; - import java.util.List; import java.util.Optional; @@ -40,10 +39,9 @@ public TagPreviewCommitCallback(AddPartitionTagCallback tagCallback, TagPreview } @Override - public void call( - List committedEntries, long identifier, @Nullable Long watermark) { + public void call(List committedEntries, Snapshot snapshot) { long currentMillis = System.currentTimeMillis(); - Optional tagOptional = tagPreview.extractTag(currentMillis, watermark); + Optional tagOptional = tagPreview.extractTag(currentMillis, snapshot.watermark()); tagOptional.ifPresent(tagCallback::notifyCreation); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 80ce33ce34de..d2034d71c487 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1007,7 +1007,7 @@ boolean tryCommitOnce( identifier, commitKind.name())); } - commitCallbacks.forEach(callback -> callback.call(tableFiles, identifier, watermark)); + commitCallbacks.forEach(callback -> callback.call(tableFiles, newSnapshot)); return true; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index a2609cee4e64..badbf1a4526f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -24,7 +24,6 @@ import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.iceberg.IcebergCommitCallback; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.metastore.AddPartitionCommitCallback; @@ -397,7 +396,7 @@ public TableCommitImpl newCommit(String commitUser) { options.forceCreatingSnapshot()); } - private List createCommitCallbacks(String commitUser) { + protected List createCommitCallbacks(String commitUser) { List callbacks = new ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions())); CoreOptions options = coreOptions(); @@ -423,10 +422,6 @@ private List createCommitCallbacks(String commitUser) { callbacks.add(callback); } - if (options.metadataIcebergCompatible()) { - callbacks.add(new IcebergCommitCallback(this, commitUser)); - } - return callbacks; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 0af78a5dac8b..4b6dd9b08270 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.operation.AppendOnlyFileStoreScan; import org.apache.paimon.operation.AppendOnlyFileStoreWrite; @@ -32,6 +33,7 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.query.LocalTableQuery; +import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.AbstractDataTableRead; import org.apache.paimon.table.source.AppendOnlySplitGenerator; @@ -43,6 +45,7 @@ import org.apache.paimon.utils.Preconditions; import java.io.IOException; +import java.util.List; import java.util.function.BiConsumer; /** {@link FileStoreTable} for append table. */ @@ -157,4 +160,16 @@ public TableWriteImpl newWrite( public LocalTableQuery newLocalTableQuery() { throw new UnsupportedOperationException(); } + + @Override + protected List createCommitCallbacks(String commitUser) { + List callbacks = super.createCommitCallbacks(commitUser); + CoreOptions options = coreOptions(); + + if (options.metadataIcebergCompatible()) { + callbacks.add(new AppendOnlyIcebergCommitCallback(this, commitUser)); + } + + return callbacks; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index b1e5b5366c3d..9c15a7bd1f3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -23,6 +23,7 @@ import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.mergetree.compact.LookupMergeFunction; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; @@ -33,6 +34,7 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.query.LocalTableQuery; +import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.KeyValueTableRead; @@ -177,4 +179,16 @@ public TableWriteImpl newWrite( public LocalTableQuery newLocalTableQuery() { return new LocalTableQuery(this); } + + @Override + protected List createCommitCallbacks(String commitUser) { + List callbacks = super.createCommitCallbacks(commitUser); + CoreOptions options = coreOptions(); + + if (options.metadataIcebergCompatible()) { + callbacks.add(new PrimaryKeyIcebergCommitCallback(this, commitUser)); + } + + return callbacks; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java index 23f39229de3d..c4b606441df6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java @@ -18,11 +18,10 @@ package org.apache.paimon.table.sink; +import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; -import javax.annotation.Nullable; - import java.util.List; /** @@ -38,7 +37,7 @@ */ public interface CommitCallback extends AutoCloseable { - void call(List committedEntries, long identifier, @Nullable Long watermark); + void call(List committedEntries, Snapshot snapshot); void retry(ManifestCommittable committable); } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 668d91d44bdf..4efeaa855fb2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -26,15 +26,20 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.disk.IOManagerImpl; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.hadoop.conf.Configuration; @@ -52,6 +57,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -59,6 +65,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; @@ -67,6 +74,181 @@ public class IcebergCompatibilityTest { @TempDir java.nio.file.Path tempDir; + // ------------------------------------------------------------------------ + // Constructed Tests + // ------------------------------------------------------------------------ + + @Test + public void testFileLevelChange() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"k", "v"}); + FileStoreTable table = + createPaimonTable( + rowType, Collections.emptyList(), Collections.singletonList("k"), 1); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 10)); + write.write(GenericRow.of(2, 20)); + commit.commit(1, write.prepareCommit(false, 1)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)"); + + write.compact(BinaryRow.EMPTY_ROW, 0, true); + commit.commit(2, write.prepareCommit(true, 2)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)"); + + write.close(); + commit.close(); + } + + @Test + public void testDelete() throws Exception { + testDeleteImpl(false); + } + + @Test + public void testDeleteWithDeletionVector() throws Exception { + testDeleteImpl(true); + } + + private void testDeleteImpl(boolean deletionVector) throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"k", "v"}); + Map customOptions = new HashMap<>(); + if (deletionVector) { + customOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"); + } + FileStoreTable table = + createPaimonTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + 1, + customOptions); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = + table.newWrite(commitUser) + .withIOManager(new IOManagerImpl(tempDir.toString() + "/tmp")); + TableCommitImpl commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 10)); + write.write(GenericRow.of(2, 20)); + commit.commit(1, write.prepareCommit(false, 1)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)"); + + write.write(GenericRow.of(2, 21)); + write.compact(BinaryRow.EMPTY_ROW, 0, true); + commit.commit(2, write.prepareCommit(true, 2)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 21)"); + + write.write(GenericRow.ofKind(RowKind.DELETE, 1, 10)); + commit.commit(3, write.prepareCommit(false, 3)); + // not changed because no full compaction + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 21)"); + + write.compact(BinaryRow.EMPTY_ROW, 0, true); + commit.commit(4, write.prepareCommit(true, 4)); + if (deletionVector) { + // level 0 file is compacted into deletion vector, so max level data file does not + // change + // this is still a valid table state at some time in the history + assertThat(getIcebergResult()) + .containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 21)"); + } else { + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(2, 21)"); + } + + write.close(); + commit.close(); + } + + @Test + public void testRetryCreateMetadata() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"k", "v"}); + FileStoreTable table = + createPaimonTable( + rowType, Collections.emptyList(), Collections.singletonList("k"), 1); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 10)); + write.write(GenericRow.of(2, 20)); + commit.commit(1, write.prepareCommit(false, 1)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)"); + + write.write(GenericRow.of(1, 11)); + write.write(GenericRow.of(3, 30)); + write.compact(BinaryRow.EMPTY_ROW, 0, true); + List commitMessages2 = write.prepareCommit(true, 2); + commit.commit(2, commitMessages2); + assertThat(table.latestSnapshotId()).hasValue(3L); + + IcebergPathFactory pathFactory = new IcebergPathFactory(table.location()); + Path metadata3Path = pathFactory.toMetadataPath(3); + assertThat(table.fileIO().exists(metadata3Path)).isTrue(); + + table.fileIO().deleteQuietly(metadata3Path); + Map> retryMessages = new HashMap<>(); + retryMessages.put(2L, commitMessages2); + commit.filterAndCommit(retryMessages); + assertThat(getIcebergResult()) + .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)", "Record(3, 30)"); + + write.close(); + commit.close(); + } + + @Test + public void testSchemaChange() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"k", "v"}); + FileStoreTable table = + createPaimonTable( + rowType, Collections.emptyList(), Collections.singletonList("k"), 1); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 10)); + write.write(GenericRow.of(2, 20)); + commit.commit(1, write.prepareCommit(false, 1)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)"); + + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + schemaManager.commitChanges(SchemaChange.addColumn("v2", DataTypes.STRING())); + table = table.copyWithLatestSchema(); + write.close(); + write = table.newWrite(commitUser); + commit.close(); + commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 11, BinaryString.fromString("one"))); + write.write(GenericRow.of(3, 30, BinaryString.fromString("three"))); + write.compact(BinaryRow.EMPTY_ROW, 0, true); + commit.commit(2, write.prepareCommit(true, 2)); + assertThat(getIcebergResult()) + .containsExactlyInAnyOrder( + "Record(1, 11, one)", "Record(2, 20, null)", "Record(3, 30, three)"); + + write.close(); + commit.close(); + } + + // ------------------------------------------------------------------------ + // Random Tests + // ------------------------------------------------------------------------ + @Test public void testUnPartitionedPrimaryKeyTable() throws Exception { RowType rowType = @@ -76,20 +258,30 @@ public void testUnPartitionedPrimaryKeyTable() throws Exception { }, new String[] {"k1", "k2", "v1", "v2"}); - int numRecords = 1000; + int numRounds = 5; + int numRecords = 500; ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - int k1 = random.nextInt(0, 100); - String k2 = String.valueOf(random.nextInt(1000, 1010)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - BinaryRow.EMPTY_ROW, - String.format("%d|%s", k1, k2), - String.format("%d|%d", v1, v2), - GenericRow.of(k1, BinaryString.fromString(k2), v1, v2))); + List> testRecords = new ArrayList<>(); + List> expected = new ArrayList<>(); + Map expectedMap = new LinkedHashMap<>(); + for (int r = 0; r < numRounds; r++) { + List round = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + int k1 = random.nextInt(0, 100); + String k2 = String.valueOf(random.nextInt(1000, 1010)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + round.add( + new TestRecord( + BinaryRow.EMPTY_ROW, + GenericRow.of(k1, BinaryString.fromString(k2), v1, v2))); + expectedMap.put(String.format("%d, %s", k1, k2), String.format("%d, %d", v1, v2)); + } + testRecords.add(round); + expected.add( + expectedMap.entrySet().stream() + .map(e -> String.format("Record(%s, %s)", e.getKey(), e.getValue())) + .collect(Collectors.toList())); } runCompatibilityTest( @@ -97,8 +289,8 @@ public void testUnPartitionedPrimaryKeyTable() throws Exception { Collections.emptyList(), Arrays.asList("k1", "k2"), testRecords, - r -> String.format("%d|%s", r.get(0, Integer.class), r.get(1, String.class)), - r -> String.format("%d|%d", r.get(2, Integer.class), r.get(3, Long.class))); + expected, + Record::toString); } @Test @@ -124,26 +316,37 @@ public void testPartitionedPrimaryKeyTable() throws Exception { return b; }; - int numRecords = 1000; + int numRounds = 2; + int numRecords = 3; ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - int pt1 = random.nextInt(0, 2); - String pt2 = String.valueOf(random.nextInt(10, 12)); - String k = String.valueOf(random.nextInt(0, 100)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - binaryRow.apply(pt1, pt2), - String.format("%d|%s|%s", pt1, pt2, k), - String.format("%d|%d", v1, v2), - GenericRow.of( - pt1, - BinaryString.fromString(pt2), - BinaryString.fromString(k), - v1, - v2))); + List> testRecords = new ArrayList<>(); + List> expected = new ArrayList<>(); + Map expectedMap = new LinkedHashMap<>(); + for (int r = 0; r < numRounds; r++) { + List round = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + int pt1 = random.nextInt(0, 2); + String pt2 = String.valueOf(random.nextInt(10, 12)); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + round.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + GenericRow.of( + pt1, + BinaryString.fromString(pt2), + BinaryString.fromString(k), + v1, + v2))); + expectedMap.put( + String.format("%d, %s, %s", pt1, pt2, k), String.format("%d, %d", v1, v2)); + } + testRecords.add(round); + expected.add( + expectedMap.entrySet().stream() + .map(e -> String.format("Record(%s, %s)", e.getKey(), e.getValue())) + .collect(Collectors.toList())); } runCompatibilityTest( @@ -151,13 +354,8 @@ public void testPartitionedPrimaryKeyTable() throws Exception { Arrays.asList("pt1", "pt2"), Arrays.asList("pt1", "pt2", "k"), testRecords, - r -> - String.format( - "%d|%s|%s", - r.get(0, Integer.class), - r.get(1, String.class), - r.get(2, String.class)), - r -> String.format("%d|%d", r.get(3, Integer.class), r.get(4, Long.class))); + expected, + Record::toString); } @Test @@ -200,53 +398,64 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { return b; }; - int numRecords = 1000; + int numRounds = 5; + int numRecords = 500; ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - int pt = random.nextInt(0, 2); - boolean vBoolean = random.nextBoolean(); - long vBigInt = random.nextLong(); - float vFloat = random.nextFloat(); - double vDouble = random.nextDouble(); - Decimal vDecimal = Decimal.fromUnscaledLong(random.nextLong(0, 100000000), 8, 3); - String vChar = String.valueOf(random.nextInt()); - String vVarChar = String.valueOf(random.nextInt()); - byte[] vBinary = String.valueOf(random.nextInt()).getBytes(); - byte[] vVarBinary = String.valueOf(random.nextInt()).getBytes(); - int vDate = random.nextInt(0, 30000); - - String k = - String.format( - "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s", - pt, - vBoolean, - vBigInt, - vFloat, - vDouble, - vDecimal, - vChar, - vVarChar, - new String(vBinary), - new String(vVarBinary), - LocalDate.ofEpochDay(vDate)); - testRecords.add( - new TestRecord( - binaryRow.apply(pt), - k, - "", - GenericRow.of( - pt, - vBoolean, - vBigInt, - vFloat, - vDouble, - vDecimal, - BinaryString.fromString(vChar), - BinaryString.fromString(vVarChar), - vBinary, - vVarBinary, - vDate))); + List> testRecords = new ArrayList<>(); + List> expected = new ArrayList<>(); + List currentExpected = new ArrayList<>(); + for (int r = 0; r < numRounds; r++) { + List round = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + int pt = random.nextInt(0, 2); + Boolean vBoolean = random.nextBoolean() ? random.nextBoolean() : null; + Long vBigInt = random.nextBoolean() ? random.nextLong() : null; + Float vFloat = random.nextBoolean() ? random.nextFloat() : null; + Double vDouble = random.nextBoolean() ? random.nextDouble() : null; + Decimal vDecimal = + random.nextBoolean() + ? Decimal.fromUnscaledLong(random.nextLong(0, 100000000), 8, 3) + : null; + String vChar = random.nextBoolean() ? String.valueOf(random.nextInt()) : null; + String vVarChar = random.nextBoolean() ? String.valueOf(random.nextInt()) : null; + byte[] vBinary = + random.nextBoolean() ? String.valueOf(random.nextInt()).getBytes() : null; + byte[] vVarBinary = + random.nextBoolean() ? String.valueOf(random.nextInt()).getBytes() : null; + Integer vDate = random.nextBoolean() ? random.nextInt(0, 30000) : null; + + round.add( + new TestRecord( + binaryRow.apply(pt), + GenericRow.of( + pt, + vBoolean, + vBigInt, + vFloat, + vDouble, + vDecimal, + BinaryString.fromString(vChar), + BinaryString.fromString(vVarChar), + vBinary, + vVarBinary, + vDate))); + currentExpected.add( + String.format( + "%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", + pt, + vBoolean, + vBigInt, + vFloat, + vDouble, + vDecimal, + vChar, + vVarChar, + vBinary == null ? null : new String(vBinary), + vVarBinary == null ? null : new String(vVarBinary), + vDate == null ? null : LocalDate.ofEpochDay(vDate))); + } + testRecords.add(round); + expected.add(new ArrayList<>(currentExpected)); } runCompatibilityTest( @@ -254,95 +463,120 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { Collections.emptyList(), Collections.emptyList(), testRecords, + expected, r -> - String.format( - "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s", - r.get(0), - r.get(1), - r.get(2), - r.get(3), - r.get(4), - r.get(5), - r.get(6), - r.get(7), - new String(r.get(8, ByteBuffer.class).array()), - new String(r.get(9, ByteBuffer.class).array()), - r.get(10)), - r -> ""); + IntStream.range(0, rowType.getFieldCount()) + .mapToObj( + i -> { + Object field = r.get(i); + if (field instanceof ByteBuffer) { + return new String(((ByteBuffer) field).array()); + } else { + return String.valueOf(field); + } + }) + .collect(Collectors.joining(", "))); } private void runCompatibilityTest( RowType rowType, List partitionKeys, List primaryKeys, - List testRecords, - Function icebergRecordToKey, - Function icebergRecordToValue) + List> testRecords, + List> expected, + Function icebergRecordToString) throws Exception { - LocalFileIO fileIO = LocalFileIO.create(); - Path path = new Path(tempDir.toString()); - - Options options = new Options(); - if (!primaryKeys.isEmpty()) { - options.set(CoreOptions.BUCKET, 2); - } - options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); - options.set(CoreOptions.FILE_FORMAT, "avro"); - Schema schema = - new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); - - FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path); - paimonCatalog.createDatabase("mydb", false); - Identifier paimonIdentifier = Identifier.create("mydb", "t"); - paimonCatalog.createTable(paimonIdentifier, schema, false); - FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); + FileStoreTable table = + createPaimonTable( + rowType, partitionKeys, primaryKeys, primaryKeys.isEmpty() ? -1 : 2); String commitUser = UUID.randomUUID().toString(); TableWriteImpl write = table.newWrite(commitUser); TableCommitImpl commit = table.newCommit(commitUser); - Map expected = new HashMap<>(); - for (TestRecord testRecord : testRecords) { - expected.put(testRecord.key, testRecord.value); - write.write(testRecord.record); - } + for (int r = 0; r < testRecords.size(); r++) { + List round = testRecords.get(r); + for (TestRecord testRecord : round) { + write.write(testRecord.record); + } - if (!primaryKeys.isEmpty()) { - for (BinaryRow partition : - testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) { - for (int b = 0; b < 2; b++) { - write.compact(partition, b, true); + if (!primaryKeys.isEmpty()) { + for (BinaryRow partition : + round.stream().map(t -> t.partition).collect(Collectors.toSet())) { + for (int b = 0; b < 2; b++) { + write.compact(partition, b, true); + } } } - } - commit.commit(1, write.prepareCommit(true, 1)); - write.close(); - commit.close(); + commit.commit(r, write.prepareCommit(true, r)); - HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), tempDir.toString()); - TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t"); - org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(icebergIdentifier); - CloseableIterable result = IcebergGenerics.read(icebergTable).build(); - Map actual = new HashMap<>(); - for (Record record : result) { - actual.put(icebergRecordToKey.apply(record), icebergRecordToValue.apply(record)); + assertThat(getIcebergResult(icebergRecordToString)).hasSameElementsAs(expected.get(r)); } - result.close(); - assertThat(actual).isEqualTo(expected); + write.close(); + commit.close(); } private static class TestRecord { private final BinaryRow partition; - private final String key; - private final String value; private final GenericRow record; - private TestRecord(BinaryRow partition, String key, String value, GenericRow record) { + private TestRecord(BinaryRow partition, GenericRow record) { this.partition = partition; - this.key = key; - this.value = value; this.record = record; } } + + // ------------------------------------------------------------------------ + // Utils + // ------------------------------------------------------------------------ + + private FileStoreTable createPaimonTable( + RowType rowType, List partitionKeys, List primaryKeys, int numBuckets) + throws Exception { + return createPaimonTable(rowType, partitionKeys, primaryKeys, numBuckets, new HashMap<>()); + } + + private FileStoreTable createPaimonTable( + RowType rowType, + List partitionKeys, + List primaryKeys, + int numBuckets, + Map customOptions) + throws Exception { + LocalFileIO fileIO = LocalFileIO.create(); + Path path = new Path(tempDir.toString()); + + Options options = new Options(customOptions); + options.set(CoreOptions.BUCKET, numBuckets); + options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); + options.set(CoreOptions.FILE_FORMAT, "avro"); + Schema schema = + new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); + + try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path)) { + paimonCatalog.createDatabase("mydb", false); + Identifier paimonIdentifier = Identifier.create("mydb", "t"); + paimonCatalog.createTable(paimonIdentifier, schema, false); + return (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); + } + } + + private List getIcebergResult() throws Exception { + return getIcebergResult(Record::toString); + } + + private List getIcebergResult(Function icebergRecordToString) + throws Exception { + HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), tempDir.toString()); + TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t"); + org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(icebergIdentifier); + CloseableIterable result = IcebergGenerics.read(icebergTable).build(); + List actual = new ArrayList<>(); + for (Record record : result) { + actual.add(icebergRecordToString.apply(record)); + } + result.close(); + return actual; + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java index 18f858c23ec7..227a3b58eeed 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.sink; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -42,8 +43,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import javax.annotation.Nullable; - import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -180,8 +179,8 @@ public TestCommitCallback(String testId) { } @Override - public void call(List entries, long identifier, @Nullable Long watermark) { - commitCallbackResult.get(testId).add(identifier); + public void call(List entries, Snapshot snapshot) { + commitCallbackResult.get(testId).add(snapshot.commitIdentifier()); } @Override diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala index 0095e1024a86..c93e10ef90b9 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark -import org.apache.paimon.CoreOptions +import org.apache.paimon.{CoreOptions, Snapshot} import org.apache.paimon.manifest.{ManifestCommittable, ManifestEntry} import org.apache.paimon.table.sink.CommitCallback @@ -64,10 +64,7 @@ object PaimonCommitTest { case class CustomCommitCallback(testId: String) extends CommitCallback { - override def call( - committedEntries: List[ManifestEntry], - identifier: Long, - watermark: lang.Long): Unit = { + override def call(committedEntries: List[ManifestEntry], snapshot: Snapshot): Unit = { PaimonCommitTest.id = testId }