From 7f15be1f4d2ae5b3d9952032f3ccfa13cdbb8cec Mon Sep 17 00:00:00 2001 From: tsreaper Date: Fri, 12 Jul 2024 15:49:37 +0800 Subject: [PATCH 1/7] [core] Support creating Iceberg metadata based on old ones --- .../paimon/iceberg/IcebergCommitCallback.java | 217 +++++++++++++++-- .../iceberg/manifest/IcebergConversions.java | 38 +++ .../manifest/IcebergManifestEntry.java | 4 + .../manifest/IcebergManifestFileMeta.java | 4 + .../iceberg/IcebergCompatibilityTest.java | 219 ++++++++++-------- 5 files changed, 359 insertions(+), 123 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java index 158f05a32b98..4ebb5c35c8fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java @@ -19,13 +19,18 @@ package org.apache.paimon.iceberg; import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.Path; +import org.apache.paimon.iceberg.manifest.IcebergConversions; import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta; import org.apache.paimon.iceberg.manifest.IcebergManifestEntry; import org.apache.paimon.iceberg.manifest.IcebergManifestFile; import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta; import org.apache.paimon.iceberg.manifest.IcebergManifestList; +import org.apache.paimon.iceberg.manifest.IcebergPartitionSummary; import org.apache.paimon.iceberg.metadata.IcebergMetadata; import org.apache.paimon.iceberg.metadata.IcebergPartitionField; import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec; @@ -35,12 +40,15 @@ import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitCallback; +import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; @@ -52,9 +60,16 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg readers can read @@ -137,7 +152,11 @@ private void commitMetadata(long identifier) throws IOException { long currentSnapshot = pair.getLeft(); Long baseSnapshot = pair.getRight(); - createMetadataWithoutBase(currentSnapshot); + if (baseSnapshot == null) { + createMetadataWithoutBase(currentSnapshot); + } else { + createMetadataWithBase(committable, currentSnapshot, baseSnapshot); + } } private Pair getCurrentAndBaseSnapshotIds(long commitIdentifier) { @@ -162,7 +181,9 @@ private Pair getCurrentAndBaseSnapshotIds(long commitIdentifier) { Snapshot snapshot = snapshotManager.snapshot(id); if (!snapshot.commitUser().equals(commitUser) || snapshot.commitIdentifier() < commitIdentifier) { - baseSnapshotId = id; + if (table.fileIO().exists(pathFactory.toMetadataPath(id))) { + baseSnapshotId = id; + } break; } } catch (Exception ignore) { @@ -175,18 +196,14 @@ private Pair getCurrentAndBaseSnapshotIds(long commitIdentifier) { private void createMetadataWithoutBase(long snapshotId) throws IOException { SnapshotReader snapshotReader = table.newSnapshotReader().withSnapshot(snapshotId); + Function> dataSplitToManifestEntries = + s -> + s.convertToRawFiles().get().stream() + .map(r -> rawFileToManifestEntry(r, s.partition(), snapshotId)); Iterator entryIterator = snapshotReader.read().dataSplits().stream() .filter(DataSplit::rawConvertible) - .flatMap(s -> dataSplitToDataFileMeta(s).stream()) - .map( - m -> - new IcebergManifestEntry( - IcebergManifestEntry.Status.ADDED, - snapshotId, - snapshotId, - snapshotId, - m)) + .flatMap(dataSplitToManifestEntries) .iterator(); List manifestFileMetas = manifestFile.rollingWrite(entryIterator, snapshotId); @@ -229,19 +246,173 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException { String.valueOf(snapshotId)); } - private List dataSplitToDataFileMeta(DataSplit dataSplit) { - List result = new ArrayList<>(); - for (RawFile rawFile : dataSplit.convertToRawFiles().get()) { - result.add( - new IcebergDataFileMeta( - IcebergDataFileMeta.Content.DATA, - rawFile.path(), - rawFile.format(), - dataSplit.partition(), - rawFile.rowCount(), - rawFile.fileSize())); + private void createMetadataWithBase( + ManifestCommittable committable, long currentSnapshotId, long baseSnapshotId) + throws IOException { + Set modifiedPartitions = + committable.fileCommittables().stream() + .map(CommitMessage::partition) + .collect(Collectors.toSet()); + List modifiedPartitionsList = new ArrayList<>(modifiedPartitions); + + List dataSplits = + table.newSnapshotReader() + .withPartitionFilter(modifiedPartitionsList) + .read() + .dataSplits(); + Map> currentRawFiles = new HashMap<>(); + for (DataSplit dataSplit : dataSplits) { + if (dataSplit.rawConvertible() && modifiedPartitions.contains(dataSplit.partition())) { + dataSplit + .convertToRawFiles() + .get() + .forEach( + r -> + currentRawFiles.put( + r.path(), Pair.of(r, dataSplit.partition()))); + } } - return result; + + IcebergMetadata baseMetadata = + IcebergMetadata.fromPath( + table.fileIO(), pathFactory.toMetadataPath(baseSnapshotId)); + List baseManifestFileMetas = + manifestList.read(baseMetadata.currentSnapshot().manifestList()); + RowType partitionType = table.schema().logicalPartitionType(); + PartitionPredicate predicate = + PartitionPredicate.fromMultiple(partitionType, modifiedPartitionsList); + + IcebergSnapshotSummary snapshotSummary = + new IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_APPEND); + List newManifestFileMetas = new ArrayList<>(); + for (IcebergManifestFileMeta fileMeta : baseManifestFileMetas) { + // use partition predicate to only check modified partitions + int numFields = partitionType.getFieldCount(); + GenericRow minValues = new GenericRow(numFields); + GenericRow maxValues = new GenericRow(numFields); + for (int i = 0; i < numFields; i++) { + IcebergPartitionSummary summary = fileMeta.partitions().get(i); + DataType fieldType = partitionType.getTypeAt(i); + minValues.setField(i, IcebergConversions.toObject(fieldType, summary.lowerBound())); + maxValues.setField(i, IcebergConversions.toObject(fieldType, summary.upperBound())); + } + + if (predicate.test( + fileMeta.liveRowsCount(), + minValues, + maxValues, + // IcebergPartitionSummary only has `containsNull` field and does not have the + // exact number of nulls, so we set null count to 0 to not affect filtering + new GenericArray(new long[numFields]))) { + // check if any IcebergManifestEntry in this manifest file meta is removed + List entries = + manifestFile.read(new Path(fileMeta.manifestPath()).getName()); + Set removedPaths = new HashSet<>(); + for (IcebergManifestEntry entry : entries) { + if (entry.isLive() && modifiedPartitions.contains(entry.file().partition())) { + String path = entry.file().filePath(); + if (currentRawFiles.containsKey(path)) { + currentRawFiles.remove(path); + } else { + removedPaths.add(path); + } + } + } + + if (removedPaths.isEmpty()) { + // nothing is removed, use this file meta again + newManifestFileMetas.add(fileMeta); + } else { + // some file is removed, rewrite this file meta + snapshotSummary = + new IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_OVERWRITE); + List newEntries = new ArrayList<>(); + for (IcebergManifestEntry entry : entries) { + if (entry.isLive()) { + newEntries.add( + new IcebergManifestEntry( + removedPaths.contains(entry.file().filePath()) + ? IcebergManifestEntry.Status.DELETED + : IcebergManifestEntry.Status.EXISTING, + entry.snapshotId(), + entry.sequenceNumber(), + entry.fileSequenceNumber(), + entry.file())); + } + } + newManifestFileMetas.addAll( + manifestFile.rollingWrite(newEntries.iterator(), currentSnapshotId)); + } + } + } + + if (!currentRawFiles.isEmpty()) { + // add new raw files + newManifestFileMetas.addAll( + manifestFile.rollingWrite( + currentRawFiles.values().stream() + .map( + p -> + rawFileToManifestEntry( + p.getLeft(), + p.getRight(), + currentSnapshotId)) + .iterator(), + currentSnapshotId)); + } + + String manifestListFileName = manifestList.writeWithoutRolling(newManifestFileMetas); + + // add new schema if needed + int schemaId = (int) table.schema().id(); + List schemas = baseMetadata.schemas(); + if (baseMetadata.currentSchemaId() != schemaId) { + schemas = new ArrayList<>(schemas); + schemas.add(new IcebergSchema(table.schema())); + } + + List snapshots = new ArrayList<>(baseMetadata.snapshots()); + snapshots.add( + new IcebergSnapshot( + currentSnapshotId, + currentSnapshotId, + System.currentTimeMillis(), + snapshotSummary, + pathFactory.toManifestListPath(manifestListFileName).toString(), + schemaId)); + + IcebergMetadata metadata = + new IcebergMetadata( + baseMetadata.tableUuid(), + baseMetadata.location(), + currentSnapshotId, + table.schema().highestFieldId(), + schemas, + schemaId, + baseMetadata.partitionSpecs(), + baseMetadata.lastPartitionId(), + snapshots, + (int) currentSnapshotId); + table.fileIO() + .tryToWriteAtomic(pathFactory.toMetadataPath(currentSnapshotId), metadata.toJson()); + table.fileIO() + .overwriteFileUtf8( + new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME), + String.valueOf(currentSnapshotId)); + } + + private IcebergManifestEntry rawFileToManifestEntry( + RawFile rawFile, BinaryRow partition, long snapshotId) { + IcebergDataFileMeta fileMeta = + new IcebergDataFileMeta( + IcebergDataFileMeta.Content.DATA, + rawFile.path(), + rawFile.format(), + partition, + rawFile.rowCount(), + rawFile.fileSize()); + return new IcebergManifestEntry( + IcebergManifestEntry.Status.ADDED, snapshotId, snapshotId, snapshotId, fileMeta); } private List getPartitionFields(RowType partitionType) { diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index fb83dd52cec1..6ec6d66aa1dd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -18,13 +18,16 @@ package org.apache.paimon.iceberg.manifest; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DecimalType; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; @@ -40,6 +43,8 @@ private IcebergConversions() {} private static final ThreadLocal ENCODER = ThreadLocal.withInitial(StandardCharsets.UTF_8::newEncoder); + private static final ThreadLocal DECODER = + ThreadLocal.withInitial(StandardCharsets.UTF_8::newDecoder); public static ByteBuffer toByteBuffer(DataType type, Object value) { switch (type.getTypeRoot()) { @@ -78,4 +83,37 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { throw new UnsupportedOperationException("Cannot serialize type: " + type); } } + + public static Object toObject(DataType type, byte[] bytes) { + switch (type.getTypeRoot()) { + case BOOLEAN: + return bytes[0] != 0; + case INTEGER: + case DATE: + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getInt(); + case BIGINT: + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong(); + case FLOAT: + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getFloat(); + case DOUBLE: + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getDouble(); + case CHAR: + case VARCHAR: + try { + return BinaryString.fromString( + DECODER.get().decode(ByteBuffer.wrap(bytes)).toString()); + } catch (CharacterCodingException e) { + throw new RuntimeException("Failed to decode bytes as UTF-8", e); + } + case BINARY: + case VARBINARY: + return bytes; + case DECIMAL: + DecimalType decimalType = (DecimalType) type; + return Decimal.fromUnscaledBytes( + bytes, decimalType.getPrecision(), decimalType.getScale()); + default: + throw new UnsupportedOperationException("Cannot deserialize type: " + type); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java index 6b76fb7572a7..213ec01316b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntry.java @@ -90,6 +90,10 @@ public Status status() { return status; } + public boolean isLive() { + return status == Status.ADDED || status == Status.EXISTING; + } + public long snapshotId() { return snapshotId; } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java index 571b249608fc..01b3b96acbf6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFileMeta.java @@ -157,6 +157,10 @@ public long deletedRowsCount() { return deletedRowsCount; } + public long liveRowsCount() { + return addedRowsCount + existingRowsCount; + } + public List partitions() { return partitions; } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 668d91d44bdf..661082f3523d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -76,20 +76,25 @@ public void testUnPartitionedPrimaryKeyTable() throws Exception { }, new String[] {"k1", "k2", "v1", "v2"}); - int numRecords = 1000; + int numRounds = 5; + int numRecords = 500; ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - int k1 = random.nextInt(0, 100); - String k2 = String.valueOf(random.nextInt(1000, 1010)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - BinaryRow.EMPTY_ROW, - String.format("%d|%s", k1, k2), - String.format("%d|%d", v1, v2), - GenericRow.of(k1, BinaryString.fromString(k2), v1, v2))); + List> testRecords = new ArrayList<>(); + for (int r = 0; r < numRounds; r++) { + List round = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + int k1 = random.nextInt(0, 100); + String k2 = String.valueOf(random.nextInt(1000, 1010)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + round.add( + new TestRecord( + BinaryRow.EMPTY_ROW, + String.format("%d|%s", k1, k2), + String.format("%d|%d", v1, v2), + GenericRow.of(k1, BinaryString.fromString(k2), v1, v2))); + } + testRecords.add(round); } runCompatibilityTest( @@ -124,26 +129,31 @@ public void testPartitionedPrimaryKeyTable() throws Exception { return b; }; - int numRecords = 1000; + int numRounds = 5; + int numRecords = 500; ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - int pt1 = random.nextInt(0, 2); - String pt2 = String.valueOf(random.nextInt(10, 12)); - String k = String.valueOf(random.nextInt(0, 100)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - binaryRow.apply(pt1, pt2), - String.format("%d|%s|%s", pt1, pt2, k), - String.format("%d|%d", v1, v2), - GenericRow.of( - pt1, - BinaryString.fromString(pt2), - BinaryString.fromString(k), - v1, - v2))); + List> testRecords = new ArrayList<>(); + for (int r = 0; r < numRounds; r++) { + List round = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + int pt1 = random.nextInt(0, 2); + String pt2 = String.valueOf(random.nextInt(10, 12)); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + round.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + String.format("%d|%s|%s", pt1, pt2, k), + String.format("%d|%d", v1, v2), + GenericRow.of( + pt1, + BinaryString.fromString(pt2), + BinaryString.fromString(k), + v1, + v2))); + } + testRecords.add(round); } runCompatibilityTest( @@ -200,53 +210,58 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { return b; }; - int numRecords = 1000; + int numRounds = 5; + int numRecords = 500; ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - int pt = random.nextInt(0, 2); - boolean vBoolean = random.nextBoolean(); - long vBigInt = random.nextLong(); - float vFloat = random.nextFloat(); - double vDouble = random.nextDouble(); - Decimal vDecimal = Decimal.fromUnscaledLong(random.nextLong(0, 100000000), 8, 3); - String vChar = String.valueOf(random.nextInt()); - String vVarChar = String.valueOf(random.nextInt()); - byte[] vBinary = String.valueOf(random.nextInt()).getBytes(); - byte[] vVarBinary = String.valueOf(random.nextInt()).getBytes(); - int vDate = random.nextInt(0, 30000); + List> testRecords = new ArrayList<>(); + for (int r = 0; r < numRounds; r++) { + List round = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + int pt = random.nextInt(0, 2); + boolean vBoolean = random.nextBoolean(); + long vBigInt = random.nextLong(); + float vFloat = random.nextFloat(); + double vDouble = random.nextDouble(); + Decimal vDecimal = Decimal.fromUnscaledLong(random.nextLong(0, 100000000), 8, 3); + String vChar = String.valueOf(random.nextInt()); + String vVarChar = String.valueOf(random.nextInt()); + byte[] vBinary = String.valueOf(random.nextInt()).getBytes(); + byte[] vVarBinary = String.valueOf(random.nextInt()).getBytes(); + int vDate = random.nextInt(0, 30000); - String k = - String.format( - "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s", - pt, - vBoolean, - vBigInt, - vFloat, - vDouble, - vDecimal, - vChar, - vVarChar, - new String(vBinary), - new String(vVarBinary), - LocalDate.ofEpochDay(vDate)); - testRecords.add( - new TestRecord( - binaryRow.apply(pt), - k, - "", - GenericRow.of( - pt, - vBoolean, - vBigInt, - vFloat, - vDouble, - vDecimal, - BinaryString.fromString(vChar), - BinaryString.fromString(vVarChar), - vBinary, - vVarBinary, - vDate))); + String k = + String.format( + "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s", + pt, + vBoolean, + vBigInt, + vFloat, + vDouble, + vDecimal, + vChar, + vVarChar, + new String(vBinary), + new String(vVarBinary), + LocalDate.ofEpochDay(vDate)); + round.add( + new TestRecord( + binaryRow.apply(pt), + k, + "", + GenericRow.of( + pt, + vBoolean, + vBigInt, + vFloat, + vDouble, + vDecimal, + BinaryString.fromString(vChar), + BinaryString.fromString(vVarChar), + vBinary, + vVarBinary, + vDate))); + } + testRecords.add(round); } runCompatibilityTest( @@ -275,7 +290,7 @@ private void runCompatibilityTest( RowType rowType, List partitionKeys, List primaryKeys, - List testRecords, + List> testRecords, Function icebergRecordToKey, Function icebergRecordToValue) throws Exception { @@ -302,34 +317,38 @@ private void runCompatibilityTest( TableCommitImpl commit = table.newCommit(commitUser); Map expected = new HashMap<>(); - for (TestRecord testRecord : testRecords) { - expected.put(testRecord.key, testRecord.value); - write.write(testRecord.record); - } + for (List round : testRecords) { + for (TestRecord testRecord : round) { + expected.put(testRecord.key, testRecord.value); + write.write(testRecord.record); + } - if (!primaryKeys.isEmpty()) { - for (BinaryRow partition : - testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) { - for (int b = 0; b < 2; b++) { - write.compact(partition, b, true); + if (!primaryKeys.isEmpty()) { + for (BinaryRow partition : + round.stream().map(t -> t.partition).collect(Collectors.toSet())) { + for (int b = 0; b < 2; b++) { + write.compact(partition, b, true); + } } } - } - commit.commit(1, write.prepareCommit(true, 1)); - write.close(); - commit.close(); + commit.commit(1, write.prepareCommit(true, 1)); - HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), tempDir.toString()); - TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t"); - org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(icebergIdentifier); - CloseableIterable result = IcebergGenerics.read(icebergTable).build(); - Map actual = new HashMap<>(); - for (Record record : result) { - actual.put(icebergRecordToKey.apply(record), icebergRecordToValue.apply(record)); + HadoopCatalog icebergCatalog = + new HadoopCatalog(new Configuration(), tempDir.toString()); + TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t"); + org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(icebergIdentifier); + CloseableIterable result = IcebergGenerics.read(icebergTable).build(); + Map actual = new HashMap<>(); + for (Record record : result) { + actual.put(icebergRecordToKey.apply(record), icebergRecordToValue.apply(record)); + } + result.close(); + + assertThat(actual).isEqualTo(expected); } - result.close(); - assertThat(actual).isEqualTo(expected); + write.close(); + commit.close(); } private static class TestRecord { From 8f208ab9be2b79f876383f3ca84fcaabb2ae3ccd Mon Sep 17 00:00:00 2001 From: tsreaper Date: Fri, 12 Jul 2024 15:57:06 +0800 Subject: [PATCH 2/7] [fix] Add null tests --- .../iceberg/IcebergCompatibilityTest.java | 62 +++++++++++-------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 661082f3523d..7abd52ce5780 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -218,16 +218,21 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { List round = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { int pt = random.nextInt(0, 2); - boolean vBoolean = random.nextBoolean(); - long vBigInt = random.nextLong(); - float vFloat = random.nextFloat(); - double vDouble = random.nextDouble(); - Decimal vDecimal = Decimal.fromUnscaledLong(random.nextLong(0, 100000000), 8, 3); - String vChar = String.valueOf(random.nextInt()); - String vVarChar = String.valueOf(random.nextInt()); - byte[] vBinary = String.valueOf(random.nextInt()).getBytes(); - byte[] vVarBinary = String.valueOf(random.nextInt()).getBytes(); - int vDate = random.nextInt(0, 30000); + Boolean vBoolean = random.nextBoolean() ? random.nextBoolean() : null; + Long vBigInt = random.nextBoolean() ? random.nextLong() : null; + Float vFloat = random.nextBoolean() ? random.nextFloat() : null; + Double vDouble = random.nextBoolean() ? random.nextDouble() : null; + Decimal vDecimal = + random.nextBoolean() + ? Decimal.fromUnscaledLong(random.nextLong(0, 100000000), 8, 3) + : null; + String vChar = random.nextBoolean() ? String.valueOf(random.nextInt()) : null; + String vVarChar = random.nextBoolean() ? String.valueOf(random.nextInt()) : null; + byte[] vBinary = + random.nextBoolean() ? String.valueOf(random.nextInt()).getBytes() : null; + byte[] vVarBinary = + random.nextBoolean() ? String.valueOf(random.nextInt()).getBytes() : null; + Integer vDate = random.nextBoolean() ? random.nextInt(0, 30000) : null; String k = String.format( @@ -240,9 +245,9 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { vDecimal, vChar, vVarChar, - new String(vBinary), - new String(vVarBinary), - LocalDate.ofEpochDay(vDate)); + vBinary == null ? null : new String(vBinary), + vVarBinary == null ? null : new String(vVarBinary), + vDate == null ? null : LocalDate.ofEpochDay(vDate)); round.add( new TestRecord( binaryRow.apply(pt), @@ -269,20 +274,23 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { Collections.emptyList(), Collections.emptyList(), testRecords, - r -> - String.format( - "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s", - r.get(0), - r.get(1), - r.get(2), - r.get(3), - r.get(4), - r.get(5), - r.get(6), - r.get(7), - new String(r.get(8, ByteBuffer.class).array()), - new String(r.get(9, ByteBuffer.class).array()), - r.get(10)), + r -> { + ByteBuffer vBinary = r.get(8, ByteBuffer.class); + ByteBuffer vVarBinary = r.get(9, ByteBuffer.class); + return String.format( + "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s", + r.get(0), + r.get(1), + r.get(2), + r.get(3), + r.get(4), + r.get(5), + r.get(6), + r.get(7), + vBinary == null ? null : new String(vBinary.array()), + vVarBinary == null ? null : new String(vVarBinary.array()), + r.get(10)); + }, r -> ""); } From 33e74be507e40b3fd936bff51cee1a9b2ac4dcd3 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Tue, 23 Jul 2024 10:30:41 +0800 Subject: [PATCH 3/7] [fix] Use file changes from ManifestCommittable to produce Iceberg metadata --- ...ava => AbstractIcebergCommitCallback.java} | 344 +++++++++++------- .../AppendOnlyIcebergCommitCallback.java | 35 ++ .../PrimaryKeyIcebergCommitCallback.java | 36 ++ .../metadata/IcebergSnapshotSummary.java | 4 +- .../paimon/table/AbstractFileStoreTable.java | 7 +- .../table/AppendOnlyFileStoreTable.java | 15 + .../table/PrimaryKeyFileStoreTable.java | 14 + .../iceberg/IcebergCompatibilityTest.java | 230 +++++++----- 8 files changed, 465 insertions(+), 220 deletions(-) rename paimon-core/src/main/java/org/apache/paimon/iceberg/{IcebergCommitCallback.java => AbstractIcebergCommitCallback.java} (64%) create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java similarity index 64% rename from paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java rename to paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index 4ebb5c35c8fe..52b9de1533d4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -37,6 +37,7 @@ import org.apache.paimon.iceberg.metadata.IcebergSchema; import org.apache.paimon.iceberg.metadata.IcebergSnapshot; import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; @@ -44,12 +45,14 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; @@ -60,37 +63,37 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg readers can read * Paimon's {@link RawFile}. */ -public class IcebergCommitCallback implements CommitCallback { +public abstract class AbstractIcebergCommitCallback implements CommitCallback { // see org.apache.iceberg.hadoop.Util private static final String VERSION_HINT_FILENAME = "version-hint.text"; - private final FileStoreTable table; + protected final FileStoreTable table; private final String commitUser; private final IcebergPathFactory pathFactory; + private final FileStorePathFactory fileStorePathFactory; private final IcebergManifestFile manifestFile; private final IcebergManifestList manifestList; - public IcebergCommitCallback(FileStoreTable table, String commitUser) { + public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) { this.table = table; this.commitUser = commitUser; this.pathFactory = new IcebergPathFactory(table.location()); + this.fileStorePathFactory = table.store().pathFactory(); RowType partitionType = table.schema().logicalPartitionType(); RowType entryType = IcebergManifestEntry.schema(partitionType); @@ -196,14 +199,10 @@ private Pair getCurrentAndBaseSnapshotIds(long commitIdentifier) { private void createMetadataWithoutBase(long snapshotId) throws IOException { SnapshotReader snapshotReader = table.newSnapshotReader().withSnapshot(snapshotId); - Function> dataSplitToManifestEntries = - s -> - s.convertToRawFiles().get().stream() - .map(r -> rawFileToManifestEntry(r, s.partition(), snapshotId)); Iterator entryIterator = snapshotReader.read().dataSplits().stream() .filter(DataSplit::rawConvertible) - .flatMap(dataSplitToManifestEntries) + .flatMap(s -> dataSplitToManifestEntries(s, snapshotId).stream()) .iterator(); List manifestFileMetas = manifestFile.rollingWrite(entryIterator, snapshotId); @@ -217,7 +216,7 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException { snapshotId, snapshotId, System.currentTimeMillis(), - new IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_APPEND), + IcebergSnapshotSummary.APPEND, pathFactory.toManifestListPath(manifestListFileName).toString(), schemaId); @@ -246,92 +245,257 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException { String.valueOf(snapshotId)); } + private List dataSplitToManifestEntries( + DataSplit dataSplit, long snapshotId) { + List result = new ArrayList<>(); + for (RawFile rawFile : dataSplit.convertToRawFiles().get()) { + IcebergDataFileMeta fileMeta = + new IcebergDataFileMeta( + IcebergDataFileMeta.Content.DATA, + rawFile.path(), + rawFile.format(), + dataSplit.partition(), + rawFile.rowCount(), + rawFile.fileSize()); + result.add( + new IcebergManifestEntry( + IcebergManifestEntry.Status.ADDED, + snapshotId, + snapshotId, + snapshotId, + fileMeta)); + } + return result; + } + private void createMetadataWithBase( ManifestCommittable committable, long currentSnapshotId, long baseSnapshotId) throws IOException { - Set modifiedPartitions = + Set removedFiles = new LinkedHashSet<>(); + Map> addedFiles = new LinkedHashMap<>(); + collectFileChanges(committable, removedFiles, addedFiles); + List modifiedPartitions = committable.fileCommittables().stream() .map(CommitMessage::partition) - .collect(Collectors.toSet()); - List modifiedPartitionsList = new ArrayList<>(modifiedPartitions); - - List dataSplits = - table.newSnapshotReader() - .withPartitionFilter(modifiedPartitionsList) - .read() - .dataSplits(); - Map> currentRawFiles = new HashMap<>(); - for (DataSplit dataSplit : dataSplits) { - if (dataSplit.rawConvertible() && modifiedPartitions.contains(dataSplit.partition())) { - dataSplit - .convertToRawFiles() - .get() - .forEach( - r -> - currentRawFiles.put( - r.path(), Pair.of(r, dataSplit.partition()))); - } - } + .distinct() + .collect(Collectors.toList()); IcebergMetadata baseMetadata = IcebergMetadata.fromPath( table.fileIO(), pathFactory.toMetadataPath(baseSnapshotId)); List baseManifestFileMetas = manifestList.read(baseMetadata.currentSnapshot().manifestList()); + + Pair, Boolean> createManifestFileMetasResult; + // Note that `isAddOnly(commitable)` and `removedFiles.isEmpty()` may be different, + // because if a file's level is changed, it will first be removed and then added. + // In this case, if `baseMetadata` already contains this file, we should not add a + // duplicate. + IcebergSnapshotSummary snapshotSummary; + List newManifestFileMetas; + if (isAddOnly(committable)) { + // Fast case. We don't need to remove files from `baseMetadata`. We only need to append + // new metadata files. + snapshotSummary = IcebergSnapshotSummary.APPEND; + newManifestFileMetas = new ArrayList<>(baseManifestFileMetas); + newManifestFileMetas.addAll( + createNewlyAddedManifestFileMetas(addedFiles, currentSnapshotId)); + } else { + Pair, Boolean> result = + createWithDeleteManifestFileMetas( + removedFiles, + addedFiles, + modifiedPartitions, + baseManifestFileMetas, + currentSnapshotId); + snapshotSummary = + result.getRight() + ? IcebergSnapshotSummary.APPEND + : IcebergSnapshotSummary.OVERWRITE; + newManifestFileMetas = result.getLeft(); + } + String manifestListFileName = manifestList.writeWithoutRolling(newManifestFileMetas); + + // add new schema if needed + int schemaId = (int) table.schema().id(); + List schemas = baseMetadata.schemas(); + if (baseMetadata.currentSchemaId() != schemaId) { + schemas = new ArrayList<>(schemas); + schemas.add(new IcebergSchema(table.schema())); + } + + List snapshots = new ArrayList<>(baseMetadata.snapshots()); + snapshots.add( + new IcebergSnapshot( + currentSnapshotId, + currentSnapshotId, + System.currentTimeMillis(), + snapshotSummary, + pathFactory.toManifestListPath(manifestListFileName).toString(), + schemaId)); + + IcebergMetadata metadata = + new IcebergMetadata( + baseMetadata.tableUuid(), + baseMetadata.location(), + currentSnapshotId, + table.schema().highestFieldId(), + schemas, + schemaId, + baseMetadata.partitionSpecs(), + baseMetadata.lastPartitionId(), + snapshots, + (int) currentSnapshotId); + table.fileIO() + .tryToWriteAtomic(pathFactory.toMetadataPath(currentSnapshotId), metadata.toJson()); + table.fileIO() + .overwriteFileUtf8( + new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME), + String.valueOf(currentSnapshotId)); + } + + private boolean isAddOnly(ManifestCommittable committable) { + for (CommitMessage message : committable.fileCommittables()) { + CommitMessageImpl m = (CommitMessageImpl) message; + if (!m.newFilesIncrement().deletedFiles().isEmpty() + || !m.compactIncrement().compactBefore().isEmpty()) { + return false; + } + } + return true; + } + + private void collectFileChanges( + ManifestCommittable committable, + Set removedFiles, + Map> addedFiles) { + for (CommitMessage message : committable.fileCommittables()) { + CommitMessageImpl m = (CommitMessageImpl) message; + String bucketPath = + fileStorePathFactory.bucketPath(m.partition(), m.bucket()).toString(); + for (DataFileMeta meta : m.newFilesIncrement().deletedFiles()) { + String path = bucketPath + "/" + meta.fileName(); + removedFiles.add(path); + } + for (DataFileMeta meta : m.newFilesIncrement().newFiles()) { + if (shouldAddFileToIceberg(meta)) { + String path = bucketPath + "/" + meta.fileName(); + removedFiles.remove(path); + addedFiles.put(path, Pair.of(m.partition(), meta)); + } + } + for (DataFileMeta meta : m.compactIncrement().compactBefore()) { + String path = bucketPath + "/" + meta.fileName(); + addedFiles.remove(path); + removedFiles.add(path); + } + for (DataFileMeta meta : m.compactIncrement().compactAfter()) { + if (shouldAddFileToIceberg(meta)) { + String path = bucketPath + "/" + meta.fileName(); + removedFiles.remove(path); + addedFiles.put(path, Pair.of(m.partition(), meta)); + } + } + } + } + + protected abstract boolean shouldAddFileToIceberg(DataFileMeta meta); + + private List createNewlyAddedManifestFileMetas( + Map> addedFiles, long currentSnapshotId) + throws IOException { + if (addedFiles.isEmpty()) { + return Collections.emptyList(); + } + + return manifestFile.rollingWrite( + addedFiles.entrySet().stream() + .map( + e -> { + IcebergDataFileMeta fileMeta = + new IcebergDataFileMeta( + IcebergDataFileMeta.Content.DATA, + e.getKey(), + e.getValue().getRight().fileFormat(), + e.getValue().getLeft(), + e.getValue().getRight().rowCount(), + e.getValue().getRight().fileSize()); + return new IcebergManifestEntry( + IcebergManifestEntry.Status.ADDED, + currentSnapshotId, + currentSnapshotId, + currentSnapshotId, + fileMeta); + }) + .iterator(), + currentSnapshotId); + } + + private Pair, Boolean> createWithDeleteManifestFileMetas( + Set removedFiles, + Map> addedFiles, + List modifiedPartitions, + List baseManifestFileMetas, + long currentSnapshotId) + throws IOException { + boolean isAppend = true; + List newManifestFileMetas = new ArrayList<>(); + RowType partitionType = table.schema().logicalPartitionType(); PartitionPredicate predicate = - PartitionPredicate.fromMultiple(partitionType, modifiedPartitionsList); + PartitionPredicate.fromMultiple(partitionType, modifiedPartitions); - IcebergSnapshotSummary snapshotSummary = - new IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_APPEND); - List newManifestFileMetas = new ArrayList<>(); for (IcebergManifestFileMeta fileMeta : baseManifestFileMetas) { // use partition predicate to only check modified partitions int numFields = partitionType.getFieldCount(); GenericRow minValues = new GenericRow(numFields); GenericRow maxValues = new GenericRow(numFields); + long[] nullCounts = new long[numFields]; for (int i = 0; i < numFields; i++) { IcebergPartitionSummary summary = fileMeta.partitions().get(i); DataType fieldType = partitionType.getTypeAt(i); minValues.setField(i, IcebergConversions.toObject(fieldType, summary.lowerBound())); maxValues.setField(i, IcebergConversions.toObject(fieldType, summary.upperBound())); + // IcebergPartitionSummary only has `containsNull` field and does not have the + // exact number of nulls. + nullCounts[i] = summary.containsNull() ? 1 : 0; } - if (predicate.test( - fileMeta.liveRowsCount(), - minValues, - maxValues, - // IcebergPartitionSummary only has `containsNull` field and does not have the - // exact number of nulls, so we set null count to 0 to not affect filtering - new GenericArray(new long[numFields]))) { + if (predicate == null + || predicate.test( + fileMeta.liveRowsCount(), + minValues, + maxValues, + new GenericArray(nullCounts))) { // check if any IcebergManifestEntry in this manifest file meta is removed List entries = manifestFile.read(new Path(fileMeta.manifestPath()).getName()); - Set removedPaths = new HashSet<>(); + boolean canReuseFile = true; for (IcebergManifestEntry entry : entries) { - if (entry.isLive() && modifiedPartitions.contains(entry.file().partition())) { + if (entry.isLive()) { String path = entry.file().filePath(); - if (currentRawFiles.containsKey(path)) { - currentRawFiles.remove(path); - } else { - removedPaths.add(path); + if (addedFiles.containsKey(path)) { + // added file already exists (most probably due to level changes), + // remove it to not add a duplicate. + addedFiles.remove(path); + } else if (removedFiles.contains(path)) { + canReuseFile = false; } } } - if (removedPaths.isEmpty()) { + if (canReuseFile) { // nothing is removed, use this file meta again newManifestFileMetas.add(fileMeta); } else { // some file is removed, rewrite this file meta - snapshotSummary = - new IcebergSnapshotSummary(IcebergSnapshotSummary.OPERATION_OVERWRITE); + isAppend = false; List newEntries = new ArrayList<>(); for (IcebergManifestEntry entry : entries) { if (entry.isLive()) { newEntries.add( new IcebergManifestEntry( - removedPaths.contains(entry.file().filePath()) + removedFiles.contains(entry.file().filePath()) ? IcebergManifestEntry.Status.DELETED : IcebergManifestEntry.Status.EXISTING, entry.snapshotId(), @@ -346,73 +510,9 @@ private void createMetadataWithBase( } } - if (!currentRawFiles.isEmpty()) { - // add new raw files - newManifestFileMetas.addAll( - manifestFile.rollingWrite( - currentRawFiles.values().stream() - .map( - p -> - rawFileToManifestEntry( - p.getLeft(), - p.getRight(), - currentSnapshotId)) - .iterator(), - currentSnapshotId)); - } - - String manifestListFileName = manifestList.writeWithoutRolling(newManifestFileMetas); - - // add new schema if needed - int schemaId = (int) table.schema().id(); - List schemas = baseMetadata.schemas(); - if (baseMetadata.currentSchemaId() != schemaId) { - schemas = new ArrayList<>(schemas); - schemas.add(new IcebergSchema(table.schema())); - } - - List snapshots = new ArrayList<>(baseMetadata.snapshots()); - snapshots.add( - new IcebergSnapshot( - currentSnapshotId, - currentSnapshotId, - System.currentTimeMillis(), - snapshotSummary, - pathFactory.toManifestListPath(manifestListFileName).toString(), - schemaId)); - - IcebergMetadata metadata = - new IcebergMetadata( - baseMetadata.tableUuid(), - baseMetadata.location(), - currentSnapshotId, - table.schema().highestFieldId(), - schemas, - schemaId, - baseMetadata.partitionSpecs(), - baseMetadata.lastPartitionId(), - snapshots, - (int) currentSnapshotId); - table.fileIO() - .tryToWriteAtomic(pathFactory.toMetadataPath(currentSnapshotId), metadata.toJson()); - table.fileIO() - .overwriteFileUtf8( - new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME), - String.valueOf(currentSnapshotId)); - } - - private IcebergManifestEntry rawFileToManifestEntry( - RawFile rawFile, BinaryRow partition, long snapshotId) { - IcebergDataFileMeta fileMeta = - new IcebergDataFileMeta( - IcebergDataFileMeta.Content.DATA, - rawFile.path(), - rawFile.format(), - partition, - rawFile.rowCount(), - rawFile.fileSize()); - return new IcebergManifestEntry( - IcebergManifestEntry.Status.ADDED, snapshotId, snapshotId, snapshotId, fileMeta); + newManifestFileMetas.addAll( + createNewlyAddedManifestFileMetas(addedFiles, currentSnapshotId)); + return Pair.of(newManifestFileMetas, isAppend); } private List getPartitionFields(RowType partitionType) { diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java new file mode 100644 index 000000000000..3356107545f8 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.table.FileStoreTable; + +/** {@link AbstractIcebergCommitCallback} for append only tables. */ +public class AppendOnlyIcebergCommitCallback extends AbstractIcebergCommitCallback { + + public AppendOnlyIcebergCommitCallback(FileStoreTable table, String commitUser) { + super(table, commitUser); + } + + @Override + protected boolean shouldAddFileToIceberg(DataFileMeta meta) { + return true; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java new file mode 100644 index 000000000000..12b9c3e604d3 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.table.FileStoreTable; + +/** {@link AbstractIcebergCommitCallback} for primary key tables. */ +public class PrimaryKeyIcebergCommitCallback extends AbstractIcebergCommitCallback { + + public PrimaryKeyIcebergCommitCallback(FileStoreTable table, String commitUser) { + super(table, commitUser); + } + + @Override + protected boolean shouldAddFileToIceberg(DataFileMeta meta) { + int maxLevel = table.coreOptions().numLevels() - 1; + return meta.level() == maxLevel; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java index 0c70331eebb0..c620c4809808 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSnapshotSummary.java @@ -35,8 +35,8 @@ public class IcebergSnapshotSummary { private static final String FIELD_OPERATION = "operation"; - public static final String OPERATION_APPEND = "append"; - public static final String OPERATION_OVERWRITE = "overwrite"; + public static final IcebergSnapshotSummary APPEND = new IcebergSnapshotSummary("append"); + public static final IcebergSnapshotSummary OVERWRITE = new IcebergSnapshotSummary("overwrite"); @JsonProperty(FIELD_OPERATION) private final String operation; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index a2609cee4e64..badbf1a4526f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -24,7 +24,6 @@ import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.iceberg.IcebergCommitCallback; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.metastore.AddPartitionCommitCallback; @@ -397,7 +396,7 @@ public TableCommitImpl newCommit(String commitUser) { options.forceCreatingSnapshot()); } - private List createCommitCallbacks(String commitUser) { + protected List createCommitCallbacks(String commitUser) { List callbacks = new ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions())); CoreOptions options = coreOptions(); @@ -423,10 +422,6 @@ private List createCommitCallbacks(String commitUser) { callbacks.add(callback); } - if (options.metadataIcebergCompatible()) { - callbacks.add(new IcebergCommitCallback(this, commitUser)); - } - return callbacks; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 0af78a5dac8b..4b6dd9b08270 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.operation.AppendOnlyFileStoreScan; import org.apache.paimon.operation.AppendOnlyFileStoreWrite; @@ -32,6 +33,7 @@ import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.query.LocalTableQuery; +import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.AbstractDataTableRead; import org.apache.paimon.table.source.AppendOnlySplitGenerator; @@ -43,6 +45,7 @@ import org.apache.paimon.utils.Preconditions; import java.io.IOException; +import java.util.List; import java.util.function.BiConsumer; /** {@link FileStoreTable} for append table. */ @@ -157,4 +160,16 @@ public TableWriteImpl newWrite( public LocalTableQuery newLocalTableQuery() { throw new UnsupportedOperationException(); } + + @Override + protected List createCommitCallbacks(String commitUser) { + List callbacks = super.createCommitCallbacks(commitUser); + CoreOptions options = coreOptions(); + + if (options.metadataIcebergCompatible()) { + callbacks.add(new AppendOnlyIcebergCommitCallback(this, commitUser)); + } + + return callbacks; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index b1e5b5366c3d..9c15a7bd1f3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -23,6 +23,7 @@ import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.mergetree.compact.LookupMergeFunction; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; @@ -33,6 +34,7 @@ import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.query.LocalTableQuery; +import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.KeyValueTableRead; @@ -177,4 +179,16 @@ public TableWriteImpl newWrite( public LocalTableQuery newLocalTableQuery() { return new LocalTableQuery(this); } + + @Override + protected List createCommitCallbacks(String commitUser) { + List callbacks = super.createCommitCallbacks(commitUser); + CoreOptions options = coreOptions(); + + if (options.metadataIcebergCompatible()) { + callbacks.add(new PrimaryKeyIcebergCommitCallback(this, commitUser)); + } + + return callbacks; + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 7abd52ce5780..9b682ee253de 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -51,7 +51,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -59,6 +59,7 @@ import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; @@ -67,6 +68,40 @@ public class IcebergCompatibilityTest { @TempDir java.nio.file.Path tempDir; + // ------------------------------------------------------------------------ + // Constructed Tests + // ------------------------------------------------------------------------ + + @Test + public void testFileLevelChange() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"k", "v"}); + FileStoreTable table = + createPaimonTable( + rowType, Collections.emptyList(), Collections.singletonList("k"), 1); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 10)); + write.write(GenericRow.of(2, 20)); + commit.commit(1, write.prepareCommit(false, 1)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)"); + + write.compact(BinaryRow.EMPTY_ROW, 0, true); + commit.commit(2, write.prepareCommit(true, 2)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)"); + + write.close(); + commit.close(); + } + + // ------------------------------------------------------------------------ + // Random Tests + // ------------------------------------------------------------------------ + @Test public void testUnPartitionedPrimaryKeyTable() throws Exception { RowType rowType = @@ -80,6 +115,8 @@ public void testUnPartitionedPrimaryKeyTable() throws Exception { int numRecords = 500; ThreadLocalRandom random = ThreadLocalRandom.current(); List> testRecords = new ArrayList<>(); + List> expected = new ArrayList<>(); + Map expectedMap = new LinkedHashMap<>(); for (int r = 0; r < numRounds; r++) { List round = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { @@ -90,11 +127,14 @@ public void testUnPartitionedPrimaryKeyTable() throws Exception { round.add( new TestRecord( BinaryRow.EMPTY_ROW, - String.format("%d|%s", k1, k2), - String.format("%d|%d", v1, v2), GenericRow.of(k1, BinaryString.fromString(k2), v1, v2))); + expectedMap.put(String.format("%d, %s", k1, k2), String.format("%d, %d", v1, v2)); } testRecords.add(round); + expected.add( + expectedMap.entrySet().stream() + .map(e -> String.format("Record(%s, %s)", e.getKey(), e.getValue())) + .collect(Collectors.toList())); } runCompatibilityTest( @@ -102,8 +142,8 @@ public void testUnPartitionedPrimaryKeyTable() throws Exception { Collections.emptyList(), Arrays.asList("k1", "k2"), testRecords, - r -> String.format("%d|%s", r.get(0, Integer.class), r.get(1, String.class)), - r -> String.format("%d|%d", r.get(2, Integer.class), r.get(3, Long.class))); + expected, + Record::toString); } @Test @@ -129,10 +169,12 @@ public void testPartitionedPrimaryKeyTable() throws Exception { return b; }; - int numRounds = 5; - int numRecords = 500; + int numRounds = 2; + int numRecords = 3; ThreadLocalRandom random = ThreadLocalRandom.current(); List> testRecords = new ArrayList<>(); + List> expected = new ArrayList<>(); + Map expectedMap = new LinkedHashMap<>(); for (int r = 0; r < numRounds; r++) { List round = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { @@ -144,16 +186,20 @@ public void testPartitionedPrimaryKeyTable() throws Exception { round.add( new TestRecord( binaryRow.apply(pt1, pt2), - String.format("%d|%s|%s", pt1, pt2, k), - String.format("%d|%d", v1, v2), GenericRow.of( pt1, BinaryString.fromString(pt2), BinaryString.fromString(k), v1, v2))); + expectedMap.put( + String.format("%d, %s, %s", pt1, pt2, k), String.format("%d, %d", v1, v2)); } testRecords.add(round); + expected.add( + expectedMap.entrySet().stream() + .map(e -> String.format("Record(%s, %s)", e.getKey(), e.getValue())) + .collect(Collectors.toList())); } runCompatibilityTest( @@ -161,13 +207,8 @@ public void testPartitionedPrimaryKeyTable() throws Exception { Arrays.asList("pt1", "pt2"), Arrays.asList("pt1", "pt2", "k"), testRecords, - r -> - String.format( - "%d|%s|%s", - r.get(0, Integer.class), - r.get(1, String.class), - r.get(2, String.class)), - r -> String.format("%d|%d", r.get(3, Integer.class), r.get(4, Long.class))); + expected, + Record::toString); } @Test @@ -214,6 +255,8 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { int numRecords = 500; ThreadLocalRandom random = ThreadLocalRandom.current(); List> testRecords = new ArrayList<>(); + List> expected = new ArrayList<>(); + List currentExpected = new ArrayList<>(); for (int r = 0; r < numRounds; r++) { List round = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { @@ -234,25 +277,9 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { random.nextBoolean() ? String.valueOf(random.nextInt()).getBytes() : null; Integer vDate = random.nextBoolean() ? random.nextInt(0, 30000) : null; - String k = - String.format( - "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s", - pt, - vBoolean, - vBigInt, - vFloat, - vDouble, - vDecimal, - vChar, - vVarChar, - vBinary == null ? null : new String(vBinary), - vVarBinary == null ? null : new String(vVarBinary), - vDate == null ? null : LocalDate.ofEpochDay(vDate)); round.add( new TestRecord( binaryRow.apply(pt), - k, - "", GenericRow.of( pt, vBoolean, @@ -265,8 +292,23 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { vBinary, vVarBinary, vDate))); + currentExpected.add( + String.format( + "%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", + pt, + vBoolean, + vBigInt, + vFloat, + vDouble, + vDecimal, + vChar, + vVarChar, + vBinary == null ? null : new String(vBinary), + vVarBinary == null ? null : new String(vVarBinary), + vDate == null ? null : LocalDate.ofEpochDay(vDate))); } testRecords.add(round); + expected.add(new ArrayList<>(currentExpected)); } runCompatibilityTest( @@ -274,24 +316,19 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { Collections.emptyList(), Collections.emptyList(), testRecords, - r -> { - ByteBuffer vBinary = r.get(8, ByteBuffer.class); - ByteBuffer vVarBinary = r.get(9, ByteBuffer.class); - return String.format( - "%s|%s|%s|%s|%s|%s|%s|%s|%s|%s|%s", - r.get(0), - r.get(1), - r.get(2), - r.get(3), - r.get(4), - r.get(5), - r.get(6), - r.get(7), - vBinary == null ? null : new String(vBinary.array()), - vVarBinary == null ? null : new String(vVarBinary.array()), - r.get(10)); - }, - r -> ""); + expected, + r -> + IntStream.range(0, rowType.getFieldCount()) + .mapToObj( + i -> { + Object field = r.get(i); + if (field instanceof ByteBuffer) { + return new String(((ByteBuffer) field).array()); + } else { + return String.valueOf(field); + } + }) + .collect(Collectors.joining(", "))); } private void runCompatibilityTest( @@ -299,35 +336,20 @@ private void runCompatibilityTest( List partitionKeys, List primaryKeys, List> testRecords, - Function icebergRecordToKey, - Function icebergRecordToValue) + List> expected, + Function icebergRecordToString) throws Exception { - LocalFileIO fileIO = LocalFileIO.create(); - Path path = new Path(tempDir.toString()); - - Options options = new Options(); - if (!primaryKeys.isEmpty()) { - options.set(CoreOptions.BUCKET, 2); - } - options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); - options.set(CoreOptions.FILE_FORMAT, "avro"); - Schema schema = - new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); - - FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path); - paimonCatalog.createDatabase("mydb", false); - Identifier paimonIdentifier = Identifier.create("mydb", "t"); - paimonCatalog.createTable(paimonIdentifier, schema, false); - FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); + FileStoreTable table = + createPaimonTable( + rowType, partitionKeys, primaryKeys, primaryKeys.isEmpty() ? -1 : 2); String commitUser = UUID.randomUUID().toString(); TableWriteImpl write = table.newWrite(commitUser); TableCommitImpl commit = table.newCommit(commitUser); - Map expected = new HashMap<>(); - for (List round : testRecords) { + for (int r = 0; r < testRecords.size(); r++) { + List round = testRecords.get(r); for (TestRecord testRecord : round) { - expected.put(testRecord.key, testRecord.value); write.write(testRecord.record); } @@ -339,20 +361,9 @@ private void runCompatibilityTest( } } } - commit.commit(1, write.prepareCommit(true, 1)); - - HadoopCatalog icebergCatalog = - new HadoopCatalog(new Configuration(), tempDir.toString()); - TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t"); - org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(icebergIdentifier); - CloseableIterable result = IcebergGenerics.read(icebergTable).build(); - Map actual = new HashMap<>(); - for (Record record : result) { - actual.put(icebergRecordToKey.apply(record), icebergRecordToValue.apply(record)); - } - result.close(); + commit.commit(r, write.prepareCommit(true, r)); - assertThat(actual).isEqualTo(expected); + assertThat(getIcebergResult(icebergRecordToString)).hasSameElementsAs(expected.get(r)); } write.close(); @@ -361,15 +372,54 @@ private void runCompatibilityTest( private static class TestRecord { private final BinaryRow partition; - private final String key; - private final String value; private final GenericRow record; - private TestRecord(BinaryRow partition, String key, String value, GenericRow record) { + private TestRecord(BinaryRow partition, GenericRow record) { this.partition = partition; - this.key = key; - this.value = value; this.record = record; } } + + // ------------------------------------------------------------------------ + // Utils + // ------------------------------------------------------------------------ + + private FileStoreTable createPaimonTable( + RowType rowType, List partitionKeys, List primaryKeys, int numBuckets) + throws Exception { + LocalFileIO fileIO = LocalFileIO.create(); + Path path = new Path(tempDir.toString()); + + Options options = new Options(); + options.set(CoreOptions.BUCKET, numBuckets); + options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); + options.set(CoreOptions.FILE_FORMAT, "avro"); + Schema schema = + new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); + + try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path)) { + paimonCatalog.createDatabase("mydb", false); + Identifier paimonIdentifier = Identifier.create("mydb", "t"); + paimonCatalog.createTable(paimonIdentifier, schema, false); + return (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); + } + } + + private List getIcebergResult() throws Exception { + return getIcebergResult(Record::toString); + } + + private List getIcebergResult(Function icebergRecordToString) + throws Exception { + HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), tempDir.toString()); + TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t"); + org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(icebergIdentifier); + CloseableIterable result = IcebergGenerics.read(icebergTable).build(); + List actual = new ArrayList<>(); + for (Record record : result) { + actual.add(icebergRecordToString.apply(record)); + } + result.close(); + return actual; + } } From fd70185460135e340d70b664b16c0fa7f50515c4 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Tue, 23 Jul 2024 14:08:53 +0800 Subject: [PATCH 4/7] [fix] Add tests for deletion vector + iceberg metadata --- .../iceberg/IcebergCompatibilityTest.java | 78 ++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 9b682ee253de..30932d8b0bc5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.disk.IOManagerImpl; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; @@ -35,6 +36,7 @@ import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.hadoop.conf.Configuration; @@ -51,6 +53,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -98,6 +101,69 @@ public void testFileLevelChange() throws Exception { commit.close(); } + @Test + public void testDelete() throws Exception { + testDeleteImpl(false); + } + + @Test + public void testDeleteWithDeletionVector() throws Exception { + testDeleteImpl(true); + } + + private void testDeleteImpl(boolean deletionVector) throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"k", "v"}); + Map customOptions = new HashMap<>(); + if (deletionVector) { + customOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true"); + } + FileStoreTable table = + createPaimonTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + 1, + customOptions); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = + table.newWrite(commitUser) + .withIOManager(new IOManagerImpl(tempDir.toString() + "/tmp")); + TableCommitImpl commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 10)); + write.write(GenericRow.of(2, 20)); + commit.commit(1, write.prepareCommit(false, 1)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)"); + + write.write(GenericRow.of(2, 21)); + write.compact(BinaryRow.EMPTY_ROW, 0, true); + commit.commit(2, write.prepareCommit(true, 2)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 21)"); + + write.write(GenericRow.ofKind(RowKind.DELETE, 1, 10)); + commit.commit(3, write.prepareCommit(false, 3)); + // not changed because no full compaction + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 21)"); + + write.compact(BinaryRow.EMPTY_ROW, 0, true); + commit.commit(4, write.prepareCommit(true, 4)); + if (deletionVector) { + // level 0 file is compacted into deletion vector, so max level data file does not + // change + // this is still a valid table state at some time in the history + assertThat(getIcebergResult()) + .containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 21)"); + } else { + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(2, 21)"); + } + + write.close(); + commit.close(); + } + // ------------------------------------------------------------------------ // Random Tests // ------------------------------------------------------------------------ @@ -387,10 +453,20 @@ private TestRecord(BinaryRow partition, GenericRow record) { private FileStoreTable createPaimonTable( RowType rowType, List partitionKeys, List primaryKeys, int numBuckets) throws Exception { + return createPaimonTable(rowType, partitionKeys, primaryKeys, numBuckets, new HashMap<>()); + } + + private FileStoreTable createPaimonTable( + RowType rowType, + List partitionKeys, + List primaryKeys, + int numBuckets, + Map customOptions) + throws Exception { LocalFileIO fileIO = LocalFileIO.create(); Path path = new Path(tempDir.toString()); - Options options = new Options(); + Options options = new Options(customOptions); options.set(CoreOptions.BUCKET, numBuckets); options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); options.set(CoreOptions.FILE_FORMAT, "avro"); From b78b92350f7900127e83681a875c961835fdb9e9 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Tue, 23 Jul 2024 14:12:22 +0800 Subject: [PATCH 5/7] [fix] Fix comments --- .../paimon/iceberg/AbstractIcebergCommitCallback.java | 6 ++++-- .../apache/paimon/iceberg/manifest/IcebergConversions.java | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index 52b9de1533d4..2730a0d50373 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -454,8 +454,10 @@ private Pair, Boolean> createWithDeleteManifestFil for (int i = 0; i < numFields; i++) { IcebergPartitionSummary summary = fileMeta.partitions().get(i); DataType fieldType = partitionType.getTypeAt(i); - minValues.setField(i, IcebergConversions.toObject(fieldType, summary.lowerBound())); - maxValues.setField(i, IcebergConversions.toObject(fieldType, summary.upperBound())); + minValues.setField( + i, IcebergConversions.toPaimonObject(fieldType, summary.lowerBound())); + maxValues.setField( + i, IcebergConversions.toPaimonObject(fieldType, summary.upperBound())); // IcebergPartitionSummary only has `containsNull` field and does not have the // exact number of nulls. nullCounts[i] = summary.containsNull() ? 1 : 0; diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index 6ec6d66aa1dd..1d9e1c3b16e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -84,7 +84,7 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { } } - public static Object toObject(DataType type, byte[] bytes) { + public static Object toPaimonObject(DataType type, byte[] bytes) { switch (type.getTypeRoot()) { case BOOLEAN: return bytes[0] != 0; From c42f915af6c6ec59c4816eec3eec32b38d6e3ce8 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Tue, 23 Jul 2024 14:30:10 +0800 Subject: [PATCH 6/7] [fix] Remove useless local variable --- .../org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index 2730a0d50373..be6d7345e3a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -286,7 +286,6 @@ private void createMetadataWithBase( List baseManifestFileMetas = manifestList.read(baseMetadata.currentSnapshot().manifestList()); - Pair, Boolean> createManifestFileMetasResult; // Note that `isAddOnly(commitable)` and `removedFiles.isEmpty()` may be different, // because if a file's level is changed, it will first be removed and then added. // In this case, if `baseMetadata` already contains this file, we should not add a From 89be30d34cfde6c39c711ef76bbbb34cd1a9b76a Mon Sep 17 00:00:00 2001 From: tsreaper Date: Mon, 19 Aug 2024 16:01:39 +0800 Subject: [PATCH 7/7] [fix] Optimize IcebergCommitCallback by directly using committed ManifestEntry --- .../AbstractIcebergCommitCallback.java | 265 ++++++++---------- .../metastore/AddPartitionCommitCallback.java | 6 +- .../metastore/TagPreviewCommitCallback.java | 8 +- .../paimon/operation/FileStoreCommitImpl.java | 2 +- .../paimon/table/sink/CommitCallback.java | 5 +- .../iceberg/IcebergCompatibilityTest.java | 81 ++++++ .../paimon/table/sink/TableCommitTest.java | 7 +- .../paimon/spark/PaimonCommitTest.scala | 7 +- 8 files changed, 218 insertions(+), 163 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index be6d7345e3a9..210e56ed9dff 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -44,21 +44,17 @@ import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitCallback; -import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.RawFile; +import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; -import javax.annotation.Nullable; - import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; @@ -132,69 +128,55 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) { } @Override - public void call( - List committedEntries, long identifier, @Nullable Long watermark) { - try { - commitMetadata(identifier); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + public void call(List committedEntries, Snapshot snapshot) { + createMetadata( + snapshot.id(), + (removedFiles, addedFiles) -> + collectFileChanges(committedEntries, removedFiles, addedFiles)); } @Override public void retry(ManifestCommittable committable) { - try { - commitMetadata(committable.identifier()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + SnapshotManager snapshotManager = table.snapshotManager(); + long snapshotId = + snapshotManager + .findSnapshotsForIdentifiers( + commitUser, Collections.singletonList(committable.identifier())) + .stream() + .mapToLong(Snapshot::id) + .max() + .orElseThrow( + () -> + new RuntimeException( + "There is no snapshot for commit user " + + commitUser + + " and identifier " + + committable.identifier() + + ". This is unexpected.")); + createMetadata( + snapshotId, + (removedFiles, addedFiles) -> + collectFileChanges(snapshotId, removedFiles, addedFiles)); } - private void commitMetadata(long identifier) throws IOException { - Pair pair = getCurrentAndBaseSnapshotIds(identifier); - long currentSnapshot = pair.getLeft(); - Long baseSnapshot = pair.getRight(); - - if (baseSnapshot == null) { - createMetadataWithoutBase(currentSnapshot); - } else { - createMetadataWithBase(committable, currentSnapshot, baseSnapshot); - } - } + private void createMetadata(long snapshotId, FileChangesCollector fileChangesCollector) { + try { + if (table.fileIO().exists(pathFactory.toMetadataPath(snapshotId))) { + return; + } - private Pair getCurrentAndBaseSnapshotIds(long commitIdentifier) { - SnapshotManager snapshotManager = table.snapshotManager(); - List currentSnapshots = - snapshotManager.findSnapshotsForIdentifiers( - commitUser, Collections.singletonList(commitIdentifier)); - Preconditions.checkArgument( - currentSnapshots.size() == 1, - "Cannot find snapshot with user {} and identifier {}", - commitUser, - commitIdentifier); - long currentSnapshotId = currentSnapshots.get(0).id(); - - long earliest = - Preconditions.checkNotNull( - snapshotManager.earliestSnapshotId(), - "Cannot determine earliest snapshot ID. This is unexpected."); - Long baseSnapshotId = null; - for (long id = currentSnapshotId - 1; id >= earliest; id--) { - try { - Snapshot snapshot = snapshotManager.snapshot(id); - if (!snapshot.commitUser().equals(commitUser) - || snapshot.commitIdentifier() < commitIdentifier) { - if (table.fileIO().exists(pathFactory.toMetadataPath(id))) { - baseSnapshotId = id; - } - break; - } - } catch (Exception ignore) { - break; + Path baseMetadataPath = pathFactory.toMetadataPath(snapshotId - 1); + if (table.fileIO().exists(baseMetadataPath)) { + createMetadataWithBase( + fileChangesCollector, + snapshotId, + IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath)); + } else { + createMetadataWithoutBase(snapshotId); } + } catch (IOException e) { + throw new UncheckedIOException(e); } - - return Pair.of(currentSnapshotId, baseSnapshotId); } private void createMetadataWithoutBase(long snapshotId) throws IOException { @@ -269,49 +251,43 @@ private List dataSplitToManifestEntries( } private void createMetadataWithBase( - ManifestCommittable committable, long currentSnapshotId, long baseSnapshotId) + FileChangesCollector fileChangesCollector, + long snapshotId, + IcebergMetadata baseMetadata) throws IOException { - Set removedFiles = new LinkedHashSet<>(); - Map> addedFiles = new LinkedHashMap<>(); - collectFileChanges(committable, removedFiles, addedFiles); - List modifiedPartitions = - committable.fileCommittables().stream() - .map(CommitMessage::partition) - .distinct() - .collect(Collectors.toList()); - - IcebergMetadata baseMetadata = - IcebergMetadata.fromPath( - table.fileIO(), pathFactory.toMetadataPath(baseSnapshotId)); List baseManifestFileMetas = manifestList.read(baseMetadata.currentSnapshot().manifestList()); - // Note that `isAddOnly(commitable)` and `removedFiles.isEmpty()` may be different, + Map removedFiles = new LinkedHashMap<>(); + Map> addedFiles = new LinkedHashMap<>(); + boolean isAddOnly = fileChangesCollector.collect(removedFiles, addedFiles); + Set modifiedPartitionsSet = new LinkedHashSet<>(removedFiles.values()); + modifiedPartitionsSet.addAll( + addedFiles.values().stream().map(Pair::getLeft).collect(Collectors.toList())); + List modifiedPartitions = new ArrayList<>(modifiedPartitionsSet); + + // Note that this check may be different from `removedFiles.isEmpty()`, // because if a file's level is changed, it will first be removed and then added. // In this case, if `baseMetadata` already contains this file, we should not add a // duplicate. - IcebergSnapshotSummary snapshotSummary; List newManifestFileMetas; - if (isAddOnly(committable)) { + IcebergSnapshotSummary snapshotSummary; + if (isAddOnly) { // Fast case. We don't need to remove files from `baseMetadata`. We only need to append // new metadata files. - snapshotSummary = IcebergSnapshotSummary.APPEND; newManifestFileMetas = new ArrayList<>(baseManifestFileMetas); - newManifestFileMetas.addAll( - createNewlyAddedManifestFileMetas(addedFiles, currentSnapshotId)); + newManifestFileMetas.addAll(createNewlyAddedManifestFileMetas(addedFiles, snapshotId)); + snapshotSummary = IcebergSnapshotSummary.APPEND; } else { - Pair, Boolean> result = + Pair, IcebergSnapshotSummary> result = createWithDeleteManifestFileMetas( removedFiles, addedFiles, modifiedPartitions, baseManifestFileMetas, - currentSnapshotId); - snapshotSummary = - result.getRight() - ? IcebergSnapshotSummary.APPEND - : IcebergSnapshotSummary.OVERWRITE; + snapshotId); newManifestFileMetas = result.getLeft(); + snapshotSummary = result.getRight(); } String manifestListFileName = manifestList.writeWithoutRolling(newManifestFileMetas); @@ -326,8 +302,8 @@ private void createMetadataWithBase( List snapshots = new ArrayList<>(baseMetadata.snapshots()); snapshots.add( new IcebergSnapshot( - currentSnapshotId, - currentSnapshotId, + snapshotId, + snapshotId, System.currentTimeMillis(), snapshotSummary, pathFactory.toManifestListPath(manifestListFileName).toString(), @@ -337,65 +313,71 @@ private void createMetadataWithBase( new IcebergMetadata( baseMetadata.tableUuid(), baseMetadata.location(), - currentSnapshotId, + snapshotId, table.schema().highestFieldId(), schemas, schemaId, baseMetadata.partitionSpecs(), baseMetadata.lastPartitionId(), snapshots, - (int) currentSnapshotId); - table.fileIO() - .tryToWriteAtomic(pathFactory.toMetadataPath(currentSnapshotId), metadata.toJson()); + (int) snapshotId); + table.fileIO().tryToWriteAtomic(pathFactory.toMetadataPath(snapshotId), metadata.toJson()); table.fileIO() .overwriteFileUtf8( new Path(pathFactory.metadataDirectory(), VERSION_HINT_FILENAME), - String.valueOf(currentSnapshotId)); + String.valueOf(snapshotId)); } - private boolean isAddOnly(ManifestCommittable committable) { - for (CommitMessage message : committable.fileCommittables()) { - CommitMessageImpl m = (CommitMessageImpl) message; - if (!m.newFilesIncrement().deletedFiles().isEmpty() - || !m.compactIncrement().compactBefore().isEmpty()) { - return false; - } - } - return true; + private interface FileChangesCollector { + boolean collect( + Map removedFiles, + Map> addedFiles) + throws IOException; } - private void collectFileChanges( - ManifestCommittable committable, - Set removedFiles, + private boolean collectFileChanges( + List manifestEntries, + Map removedFiles, Map> addedFiles) { - for (CommitMessage message : committable.fileCommittables()) { - CommitMessageImpl m = (CommitMessageImpl) message; - String bucketPath = - fileStorePathFactory.bucketPath(m.partition(), m.bucket()).toString(); - for (DataFileMeta meta : m.newFilesIncrement().deletedFiles()) { - String path = bucketPath + "/" + meta.fileName(); - removedFiles.add(path); - } - for (DataFileMeta meta : m.newFilesIncrement().newFiles()) { - if (shouldAddFileToIceberg(meta)) { - String path = bucketPath + "/" + meta.fileName(); - removedFiles.remove(path); - addedFiles.put(path, Pair.of(m.partition(), meta)); - } - } - for (DataFileMeta meta : m.compactIncrement().compactBefore()) { - String path = bucketPath + "/" + meta.fileName(); - addedFiles.remove(path); - removedFiles.add(path); - } - for (DataFileMeta meta : m.compactIncrement().compactAfter()) { - if (shouldAddFileToIceberg(meta)) { - String path = bucketPath + "/" + meta.fileName(); - removedFiles.remove(path); - addedFiles.put(path, Pair.of(m.partition(), meta)); - } + boolean isAddOnly = true; + for (ManifestEntry entry : manifestEntries) { + String path = + fileStorePathFactory.bucketPath(entry.partition(), entry.bucket()) + + "/" + + entry.fileName(); + switch (entry.kind()) { + case ADD: + if (shouldAddFileToIceberg(entry.file())) { + removedFiles.remove(path); + addedFiles.put(path, Pair.of(entry.partition(), entry.file())); + } + break; + case DELETE: + isAddOnly = false; + addedFiles.remove(path); + removedFiles.put(path, entry.partition()); + break; + default: + throw new UnsupportedOperationException( + "Unknown ManifestEntry FileKind " + entry.kind()); } } + return isAddOnly; + } + + private boolean collectFileChanges( + long snapshotId, + Map removedFiles, + Map> addedFiles) { + return collectFileChanges( + table.store() + .newScan() + .withKind(ScanMode.DELTA) + .withSnapshot(snapshotId) + .plan() + .files(), + removedFiles, + addedFiles); } protected abstract boolean shouldAddFileToIceberg(DataFileMeta meta); @@ -430,14 +412,15 @@ private List createNewlyAddedManifestFileMetas( currentSnapshotId); } - private Pair, Boolean> createWithDeleteManifestFileMetas( - Set removedFiles, - Map> addedFiles, - List modifiedPartitions, - List baseManifestFileMetas, - long currentSnapshotId) - throws IOException { - boolean isAppend = true; + private Pair, IcebergSnapshotSummary> + createWithDeleteManifestFileMetas( + Map removedFiles, + Map> addedFiles, + List modifiedPartitions, + List baseManifestFileMetas, + long currentSnapshotId) + throws IOException { + IcebergSnapshotSummary snapshotSummary = IcebergSnapshotSummary.APPEND; List newManifestFileMetas = new ArrayList<>(); RowType partitionType = table.schema().logicalPartitionType(); @@ -479,7 +462,7 @@ private Pair, Boolean> createWithDeleteManifestFil // added file already exists (most probably due to level changes), // remove it to not add a duplicate. addedFiles.remove(path); - } else if (removedFiles.contains(path)) { + } else if (removedFiles.containsKey(path)) { canReuseFile = false; } } @@ -490,13 +473,13 @@ private Pair, Boolean> createWithDeleteManifestFil newManifestFileMetas.add(fileMeta); } else { // some file is removed, rewrite this file meta - isAppend = false; + snapshotSummary = IcebergSnapshotSummary.OVERWRITE; List newEntries = new ArrayList<>(); for (IcebergManifestEntry entry : entries) { if (entry.isLive()) { newEntries.add( new IcebergManifestEntry( - removedFiles.contains(entry.file().filePath()) + removedFiles.containsKey(entry.file().filePath()) ? IcebergManifestEntry.Status.DELETED : IcebergManifestEntry.Status.EXISTING, entry.snapshotId(), @@ -513,7 +496,7 @@ private Pair, Boolean> createWithDeleteManifestFil newManifestFileMetas.addAll( createNewlyAddedManifestFileMetas(addedFiles, currentSnapshotId)); - return Pair.of(newManifestFileMetas, isAppend); + return Pair.of(newManifestFileMetas, snapshotSummary); } private List getPartitionFields(RowType partitionType) { diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java index 27468bc2fe6b..4f7d3d554ae2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java @@ -18,6 +18,7 @@ package org.apache.paimon.metastore; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestCommittable; @@ -28,8 +29,6 @@ import org.apache.paimon.shade.guava30.com.google.common.cache.Cache; import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder; -import javax.annotation.Nullable; - import java.time.Duration; import java.util.List; @@ -52,8 +51,7 @@ public AddPartitionCommitCallback(MetastoreClient client) { } @Override - public void call( - List committedEntries, long identifier, @Nullable Long watermark) { + public void call(List committedEntries, Snapshot snapshot) { committedEntries.stream() .filter(e -> FileKind.ADD.equals(e.kind())) .map(ManifestEntry::partition) diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java index 3c477f917936..7e2e00dce3dc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java @@ -18,13 +18,12 @@ package org.apache.paimon.metastore; +import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.tag.TagPreview; -import javax.annotation.Nullable; - import java.util.List; import java.util.Optional; @@ -40,10 +39,9 @@ public TagPreviewCommitCallback(AddPartitionTagCallback tagCallback, TagPreview } @Override - public void call( - List committedEntries, long identifier, @Nullable Long watermark) { + public void call(List committedEntries, Snapshot snapshot) { long currentMillis = System.currentTimeMillis(); - Optional tagOptional = tagPreview.extractTag(currentMillis, watermark); + Optional tagOptional = tagPreview.extractTag(currentMillis, snapshot.watermark()); tagOptional.ifPresent(tagCallback::notifyCreation); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 80ce33ce34de..d2034d71c487 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1007,7 +1007,7 @@ boolean tryCommitOnce( identifier, commitKind.name())); } - commitCallbacks.forEach(callback -> callback.call(tableFiles, identifier, watermark)); + commitCallbacks.forEach(callback -> callback.call(tableFiles, newSnapshot)); return true; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java index 23f39229de3d..c4b606441df6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java @@ -18,11 +18,10 @@ package org.apache.paimon.table.sink; +import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; -import javax.annotation.Nullable; - import java.util.List; /** @@ -38,7 +37,7 @@ */ public interface CommitCallback extends AutoCloseable { - void call(List committedEntries, long identifier, @Nullable Long watermark); + void call(List committedEntries, Snapshot snapshot); void retry(ManifestCommittable committable); } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 30932d8b0bc5..4efeaa855fb2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -31,7 +31,10 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.DataType; @@ -164,6 +167,84 @@ private void testDeleteImpl(boolean deletionVector) throws Exception { commit.close(); } + @Test + public void testRetryCreateMetadata() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"k", "v"}); + FileStoreTable table = + createPaimonTable( + rowType, Collections.emptyList(), Collections.singletonList("k"), 1); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 10)); + write.write(GenericRow.of(2, 20)); + commit.commit(1, write.prepareCommit(false, 1)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)"); + + write.write(GenericRow.of(1, 11)); + write.write(GenericRow.of(3, 30)); + write.compact(BinaryRow.EMPTY_ROW, 0, true); + List commitMessages2 = write.prepareCommit(true, 2); + commit.commit(2, commitMessages2); + assertThat(table.latestSnapshotId()).hasValue(3L); + + IcebergPathFactory pathFactory = new IcebergPathFactory(table.location()); + Path metadata3Path = pathFactory.toMetadataPath(3); + assertThat(table.fileIO().exists(metadata3Path)).isTrue(); + + table.fileIO().deleteQuietly(metadata3Path); + Map> retryMessages = new HashMap<>(); + retryMessages.put(2L, commitMessages2); + commit.filterAndCommit(retryMessages); + assertThat(getIcebergResult()) + .containsExactlyInAnyOrder("Record(1, 11)", "Record(2, 20)", "Record(3, 30)"); + + write.close(); + commit.close(); + } + + @Test + public void testSchemaChange() throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT()}, new String[] {"k", "v"}); + FileStoreTable table = + createPaimonTable( + rowType, Collections.emptyList(), Collections.singletonList("k"), 1); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 10)); + write.write(GenericRow.of(2, 20)); + commit.commit(1, write.prepareCommit(false, 1)); + assertThat(getIcebergResult()).containsExactlyInAnyOrder("Record(1, 10)", "Record(2, 20)"); + + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + schemaManager.commitChanges(SchemaChange.addColumn("v2", DataTypes.STRING())); + table = table.copyWithLatestSchema(); + write.close(); + write = table.newWrite(commitUser); + commit.close(); + commit = table.newCommit(commitUser); + + write.write(GenericRow.of(1, 11, BinaryString.fromString("one"))); + write.write(GenericRow.of(3, 30, BinaryString.fromString("three"))); + write.compact(BinaryRow.EMPTY_ROW, 0, true); + commit.commit(2, write.prepareCommit(true, 2)); + assertThat(getIcebergResult()) + .containsExactlyInAnyOrder( + "Record(1, 11, one)", "Record(2, 20, null)", "Record(3, 30, three)"); + + write.close(); + commit.close(); + } + // ------------------------------------------------------------------------ // Random Tests // ------------------------------------------------------------------------ diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java index 18f858c23ec7..227a3b58eeed 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.sink; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -42,8 +43,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import javax.annotation.Nullable; - import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -180,8 +179,8 @@ public TestCommitCallback(String testId) { } @Override - public void call(List entries, long identifier, @Nullable Long watermark) { - commitCallbackResult.get(testId).add(identifier); + public void call(List entries, Snapshot snapshot) { + commitCallbackResult.get(testId).add(snapshot.commitIdentifier()); } @Override diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala index 0095e1024a86..c93e10ef90b9 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark -import org.apache.paimon.CoreOptions +import org.apache.paimon.{CoreOptions, Snapshot} import org.apache.paimon.manifest.{ManifestCommittable, ManifestEntry} import org.apache.paimon.table.sink.CommitCallback @@ -64,10 +64,7 @@ object PaimonCommitTest { case class CustomCommitCallback(testId: String) extends CommitCallback { - override def call( - committedEntries: List[ManifestEntry], - identifier: Long, - watermark: lang.Long): Unit = { + override def call(committedEntries: List[ManifestEntry], snapshot: Snapshot): Unit = { PaimonCommitTest.id = testId }