From 759f6fbba1d19106da603ca5455de206fcc04ed7 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 10 Jan 2025 17:24:47 +0800 Subject: [PATCH 1/2] [core] support iceberg schema evolution --- .../iceberg/manifest/IcebergDataFileMeta.java | 12 + .../iceberg/migrate/IcebergMigrator.java | 98 +++- .../apache/paimon/migrate/FileMetaUtils.java | 67 ++- .../apache/paimon/schema/SchemaManager.java | 2 +- .../iceberg/migrate/IcebergMigrateTest.java | 419 +++++++++++++++--- 5 files changed, 503 insertions(+), 95 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java index d171962becad..cb78c3c646b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java @@ -83,6 +83,9 @@ public static Content fromId(int id) { private final InternalMap lowerBounds; private final InternalMap upperBounds; + // only used for iceberg migrate + private long schemaId = 0; + IcebergDataFileMeta( Content content, String filePath, @@ -201,6 +204,15 @@ public InternalMap upperBounds() { return upperBounds; } + public long schemaId() { + return schemaId; + } + + public IcebergDataFileMeta withSchemaId(long schemaId) { + this.schemaId = schemaId; + return this; + } + public static RowType schema(RowType partitionType) { List fields = new ArrayList<>(); fields.add(new DataField(134, "content", DataTypes.INT().notNull())); diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java index 9e91fa2d18a8..e2f56c437538 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java @@ -43,19 +43,26 @@ import org.apache.paimon.migrate.Migrator; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.types.DataField; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -136,15 +143,28 @@ public IcebergMigrator( @Override public void executeMigrate() throws Exception { - Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata); + List paimonSchemas = icebergSchemasToPaimonSchemas(icebergMetadata); + Preconditions.checkArgument( + !paimonSchemas.isEmpty(), + "paimon schemas transformed from iceberg table is empty."); Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, paimonTableName); paimonCatalog.createDatabase(paimonDatabaseName, true); - paimonCatalog.createTable(paimonIdentifier, paimonSchema, false); + TableSchema firstSchema = paimonSchemas.get(0); + Preconditions.checkArgument(firstSchema.id() == 0, "Unexpected, first schema id is not 0."); + paimonCatalog.createTable(paimonIdentifier, firstSchema.toSchema(), false); try { FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); FileIO fileIO = paimonTable.fileIO(); + SchemaManager schemaManager = paimonTable.schemaManager(); + // commit all the iceberg schemas + for (int i = 1; i < paimonSchemas.size(); i++) { + LOG.info( + "commit new schema from iceberg, new schema id:{}", + paimonSchemas.get(i).id()); + schemaManager.commit(paimonSchemas.get(i)); + } IcebergManifestFile manifestFile = IcebergManifestFile.create(paimonTable, icebergMetaPathFactory); @@ -157,25 +177,36 @@ public void executeMigrate() throws Exception { // check manifest file with 'DELETE' kind checkAndFilterManifestFiles(icebergManifestFileMetas); - // get all live iceberg entries - List icebergEntries = - icebergManifestFileMetas.stream() - .flatMap(fileMeta -> manifestFile.read(fileMeta).stream()) - .filter(IcebergManifestEntry::isLive) - .collect(Collectors.toList()); - if (icebergEntries.isEmpty()) { + Map> icebergEntries = new HashMap<>(); + for (IcebergManifestFileMeta icebergManifestFileMeta : icebergManifestFileMetas) { + long schemaId = + getSchemaIdFromIcebergManifestFile( + new Path(icebergManifestFileMeta.manifestPath()), fileIO); + List entries = manifestFile.read(icebergManifestFileMeta); + icebergEntries + .computeIfAbsent(schemaId, v -> new ArrayList<>()) + .addAll( + entries.stream() + .filter(IcebergManifestEntry::isLive) + .collect(Collectors.toList())); + } + + List icebergDataFileMetas = new ArrayList<>(); + // write schema id to IcebergDataFileMeta + for (Map.Entry> kv : icebergEntries.entrySet()) { + icebergDataFileMetas.addAll( + kv.getValue().stream() + .map(entry -> entry.file().withSchemaId(kv.getKey())) + .collect(Collectors.toList())); + } + + if (icebergDataFileMetas.isEmpty()) { LOG.info( - "No live manifest entry in iceberg table for snapshot {}, iceberg table meta path is {}.", + "No live iceberg data files in iceberg table for snapshot {}, iceberg table meta path is {}.", icebergMetadata.currentSnapshotId(), icebergLatestMetadataLocation); return; } - - List icebergDataFileMetas = - icebergEntries.stream() - .map(IcebergManifestEntry::file) - .collect(Collectors.toList()); - // Again, check if delete File exists checkAndFilterDataFiles(icebergDataFileMetas); @@ -246,10 +277,21 @@ public void renameTable(boolean ignoreIfNotExists) throws Exception { paimonCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists); } - public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata) { - // get iceberg current schema - IcebergSchema icebergSchema = - icebergMetadata.schemas().get(icebergMetadata.currentSchemaId()); + public List icebergSchemasToPaimonSchemas(IcebergMetadata icebergMetadata) { + return icebergMetadata.schemas().stream() + .map( + icebergSchema -> { + LOG.info( + "Convert iceberg schema to paimon schema, iceberg schema id: {}", + icebergSchema.schemaId()); + return TableSchema.create( + icebergSchema.schemaId(), + icebergSchemaToPaimonSchema(icebergSchema)); + }) + .collect(Collectors.toList()); + } + + public Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) { // get iceberg current partition spec int currentPartitionSpecId = icebergMetadata.defaultSpecId(); @@ -289,6 +331,18 @@ private void checkAndFilterDataFiles(List icebergDataFileMe } } + public long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO fileIO) { + + try (DataFileStream dataFileStream = + new DataFileStream<>( + fileIO.newInputStream(manifestPath), new GenericDatumReader<>())) { + String schema = dataFileStream.getMetaString("schema"); + return JsonSerdeUtil.fromJson(schema, IcebergSchema.class).schemaId(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private static List construct( List icebergDataFileMetas, FileIO fileIO, @@ -318,7 +372,9 @@ private static DataFileMeta constructFileMeta( e); } String format = icebergDataFileMeta.fileFormat(); - return FileMetaUtils.constructFileMeta(format, status, fileIO, table, dir, rollback); + long schemaId = icebergDataFileMeta.schemaId(); + return FileMetaUtils.constructFileMeta( + format, status, fileIO, table, dir, rollback, schemaId); } private MigrateTask importUnPartitionedTable( diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 366f8afcfd38..405870d5fa03 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -129,6 +129,47 @@ public static DataFileMeta constructFileMeta( } } + public static DataFileMeta constructFileMeta( + String format, + FileStatus fileStatus, + FileIO fileIO, + Table table, + Path dir, + Map rollback, + long schemaId) { + + try { + RowType rowTypeWithSchemaId = + ((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType(); + SimpleColStatsCollector.Factory[] factories = + StatsCollectorFactories.createStatsFactories( + ((FileStoreTable) table).coreOptions(), + rowTypeWithSchemaId.getFieldNames()); + + SimpleStatsExtractor simpleStatsExtractor = + FileFormat.fromIdentifier( + format, + ((FileStoreTable) table).coreOptions().toConfiguration()) + .createStatsExtractor(rowTypeWithSchemaId, factories) + .orElseThrow( + () -> + new RuntimeException( + "Can't get table stats extractor for format " + + format)); + Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, format, rollback); + return constructFileMeta( + newPath.getName(), + fileStatus.getLen(), + newPath, + simpleStatsExtractor, + fileIO, + table, + schemaId); + } catch (IOException e) { + throw new RuntimeException("error when construct file meta", e); + } + } + // -----------------------------private method--------------------------------------------- private static Path renameFile( @@ -152,7 +193,29 @@ private static DataFileMeta constructFileMeta( FileIO fileIO, Table table) throws IOException { - SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(table.rowType()); + return constructFileMeta( + fileName, + fileSize, + path, + simpleStatsExtractor, + fileIO, + table, + ((FileStoreTable) table).schema().id()); + } + + private static DataFileMeta constructFileMeta( + String fileName, + long fileSize, + Path path, + SimpleStatsExtractor simpleStatsExtractor, + FileIO fileIO, + Table table, + long schemaId) + throws IOException { + RowType rowTypeWithSchemaId = + ((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType(); + + SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(rowTypeWithSchemaId); Pair fileInfo = simpleStatsExtractor.extractWithFileInfo(fileIO, path); @@ -165,7 +228,7 @@ private static DataFileMeta constructFileMeta( stats, 0, 0, - ((FileStoreTable) table).schema().id(), + schemaId, Collections.emptyList(), null, FileSource.APPEND, diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 753bc34d95ef..d15312fce1f7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -769,7 +769,7 @@ protected void updateLastColumn(List newFields, String fieldName) } @VisibleForTesting - boolean commit(TableSchema newSchema) throws Exception { + public boolean commit(TableSchema newSchema) throws Exception { SchemaValidation.validateTableSchema(newSchema); SchemaValidation.validateFallbackBranch(this, newSchema); Path schemaPath = toSchemaPath(newSchema.id()); diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java index aadaca0c3854..455ecbd57ca5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java @@ -64,8 +64,10 @@ import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -118,27 +120,7 @@ public void beforeEach() throws Exception { public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception { Table icebergTable = createIcebergTable(isPartitioned); String format = "parquet"; - List records1 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "00"), - toIcebergRecord(2, 2, "20240101", "00")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); - } else { - writeRecordsToIceberg(icebergTable, format, records1); - } - - List records2 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "01"), - toIcebergRecord(2, 2, "20240101", "01")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); - } else { - writeRecordsToIceberg(icebergTable, format, records2); - } + writeInitialData(icebergTable, format, isPartitioned); IcebergMigrator icebergMigrator = new IcebergMigrator( @@ -160,8 +142,11 @@ public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception { .map(row -> String.format("Record(%s)", row)) .collect(Collectors.toList())) .hasSameElementsAs( - Stream.concat(records1.stream(), records2.stream()) - .map(GenericRecord::toString) + Stream.of( + "Record(1, 1, 20240101, 00)", + "Record(2, 2, 20240101, 00)", + "Record(1, 1, 20240101, 01)", + "Record(2, 2, 20240101, 01)") .collect(Collectors.toList())); // verify iceberg table has been deleted @@ -173,27 +158,7 @@ public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception { public void testMigrateAddAndDelete(boolean isPartitioned) throws Exception { Table icebergTable = createIcebergTable(isPartitioned); String format = "parquet"; - List records1 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "00"), - toIcebergRecord(2, 2, "20240101", "00")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); - } else { - writeRecordsToIceberg(icebergTable, format, records1); - } - - List records2 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "01"), - toIcebergRecord(2, 2, "20240101", "01")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); - } else { - writeRecordsToIceberg(icebergTable, format, records2); - } + writeInitialData(icebergTable, format, isPartitioned); // the file written with records2 will be deleted and generate a delete manifest entry, not // a delete file @@ -218,8 +183,7 @@ public void testMigrateAddAndDelete(boolean isPartitioned) throws Exception { .map(row -> String.format("Record(%s)", row)) .collect(Collectors.toList())) .hasSameElementsAs( - records2.stream() - .map(GenericRecord::toString) + Stream.of("Record(1, 1, 20240101, 01)", "Record(2, 2, 20240101, 01)") .collect(Collectors.toList())); } @@ -324,45 +288,115 @@ public void testMigrateWithRandomIcebergData(boolean isPartitioned) throws Excep @ParameterizedTest(name = "isPartitioned = {0}") @ValueSource(booleans = {true, false}) - public void testMigrateWithSchemaEvolution(boolean isPartitioned) throws Exception { + public void testDeleteColumn(boolean isPartitioned) throws Exception { Table icebergTable = createIcebergTable(isPartitioned); String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); - // write base data - List records1 = + icebergTable.updateSchema().deleteColumn("v").commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = Stream.of( - toIcebergRecord(1, 1, "20240101", "00"), - toIcebergRecord(2, 2, "20240101", "00")) + toIcebergRecord(3, "20240101", "00", newIceSchema), + toIcebergRecord(4, "20240101", "00", newIceSchema)) .collect(Collectors.toList()); if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); } else { - writeRecordsToIceberg(icebergTable, format, records1); + writeRecordsToIceberg(icebergTable, format, addedRecords); } - List records2 = + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 20240101, 00)", + "Record(2, 20240101, 00)", + "Record(1, 20240101, 01)", + "Record(2, 20240101, 01)", + "Record(3, 20240101, 00)", + "Record(4, 20240101, 00)") + .collect(Collectors.toList())); + } + + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testRenameColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().renameColumn("v", "v2").commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = Stream.of( - toIcebergRecord(1, 1, "20240101", "01"), - toIcebergRecord(2, 2, "20240101", "01")) + toIcebergRecord(newIceSchema, 3, 3, "20240101", "00"), + toIcebergRecord(newIceSchema, 4, 4, "20240101", "00")) .collect(Collectors.toList()); if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); } else { - writeRecordsToIceberg(icebergTable, format, records2); + writeRecordsToIceberg(icebergTable, format, addedRecords); } - // TODO: currently only support schema evolution of deleting columns - testDeleteColumn(icebergTable, format, isPartitioned); + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 1, 20240101, 00)", + "Record(2, 2, 20240101, 00)", + "Record(1, 1, 20240101, 01)", + "Record(2, 2, 20240101, 01)", + "Record(3, 3, 20240101, 00)", + "Record(4, 4, 20240101, 00)") + .collect(Collectors.toList())); } - private void testDeleteColumn(Table icebergTable, String format, boolean isPartitioned) - throws Exception { - icebergTable.updateSchema().deleteColumn("v").commit(); + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testAddColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().addColumn("v2", Types.IntegerType.get()).commit(); Schema newIceSchema = icebergTable.schema(); List addedRecords = Stream.of( - toIcebergRecord(3, "20240101", "00", newIceSchema), - toIcebergRecord(4, "20240101", "00", newIceSchema)) + toIcebergRecord(newIceSchema, 3, 3, "20240101", "00", 3), + toIcebergRecord(newIceSchema, 4, 4, "20240101", "00", 4)) .collect(Collectors.toList()); if (isPartitioned) { writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); @@ -390,15 +424,233 @@ private void testDeleteColumn(Table icebergTable, String format, boolean isParti .collect(Collectors.toList())) .hasSameElementsAs( Stream.of( - "Record(1, 20240101, 00)", - "Record(2, 20240101, 00)", - "Record(1, 20240101, 01)", - "Record(2, 20240101, 01)", - "Record(3, 20240101, 00)", - "Record(4, 20240101, 00)") + "Record(1, 1, 20240101, 00, NULL)", + "Record(2, 2, 20240101, 00, NULL)", + "Record(1, 1, 20240101, 01, NULL)", + "Record(2, 2, 20240101, 01, NULL)", + "Record(3, 3, 20240101, 00, 3)", + "Record(4, 4, 20240101, 00, 4)") + .collect(Collectors.toList())); + } + + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testReorderColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().moveAfter("v", "hh").commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = + Stream.of( + toIcebergRecord(newIceSchema, 3, "20240101", "00", 3), + toIcebergRecord(newIceSchema, 4, "20240101", "00", 4)) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); + } else { + writeRecordsToIceberg(icebergTable, format, addedRecords); + } + + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 20240101, 00, 1)", + "Record(2, 20240101, 00, 2)", + "Record(1, 20240101, 01, 1)", + "Record(2, 20240101, 01, 2)", + "Record(3, 20240101, 00, 3)", + "Record(4, 20240101, 00, 4)") .collect(Collectors.toList())); } + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testUpdateColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().updateColumn("v", Types.LongType.get()).commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = + Stream.of( + toIcebergRecord(newIceSchema, 3, 3L, "20240101", "00"), + toIcebergRecord(newIceSchema, 4, 3L, "20240101", "00")) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); + } else { + writeRecordsToIceberg(icebergTable, format, addedRecords); + } + + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 1, 20240101, 00)", + "Record(2, 2, 20240101, 00)", + "Record(1, 1, 20240101, 01)", + "Record(2, 2, 20240101, 01)", + "Record(3, 3, 20240101, 00)", + "Record(4, 3, 20240101, 00)") + .collect(Collectors.toList())); + } + + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testMigrateWithRandomIcebergEvolution(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + icebergTable.updateSchema().addColumn("v2", Types.IntegerType.get()).commit(); + String format = "parquet"; + List index = new LinkedList<>(Arrays.asList("k", "v", "dt", "hh", "v2")); + + int numRounds = 20; + int numRecords = 10; + List ops = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5)); + ThreadLocalRandom random = ThreadLocalRandom.current(); + boolean isTypeChange = false; + List> expectRecords = new ArrayList<>(); + + for (int i = 0; i < numRounds; i++) { + List records = new ArrayList<>(); + String dt = Integer.toString(random.nextInt(20240101, 20240104)); + String hh = Integer.toString(random.nextInt(3)); + + if ((i + 1) % 4 == 0 && !ops.isEmpty()) { + switch (ops.remove(random.nextInt(ops.size()))) { + case 1: + icebergTable + .updateSchema() + .addColumn("v3", Types.IntegerType.get()) + .commit(); + for (List record : expectRecords) { + record.add("NULL"); + } + index.add("v3"); + break; + case 2: + icebergTable.updateSchema().renameColumn("v", "vv").commit(); + break; + case 3: + icebergTable.updateSchema().deleteColumn("v2").commit(); + int v2Idx = index.indexOf("v2"); + for (List record : expectRecords) { + record.remove(v2Idx); + } + index.remove(v2Idx); + break; + case 4: + icebergTable.updateSchema().moveAfter("k", "hh").commit(); + int kIdx = index.indexOf("k"); + int hhIdx = index.indexOf("hh"); + for (List record : expectRecords) { + String k = record.remove(kIdx); + record.add(hhIdx, k); + } + index.remove(kIdx); + index.add(hhIdx, "k"); + break; + case 5: + icebergTable + .updateSchema() + .updateColumn("k", Types.LongType.get()) + .commit(); + isTypeChange = true; + break; + default: + throw new IllegalStateException("Unknown operation"); + } + } + for (int j = 0; j < numRecords; j++) { + List recordString = new ArrayList<>(); + GenericRecord record = GenericRecord.create(icebergTable.schema()); + for (int idx = 0; idx < index.size(); idx++) { + String field = index.get(idx); + if (field.equals("dt")) { + record.set(idx, dt); + recordString.add(dt); + } else if (field.equals("hh")) { + record.set(idx, hh); + recordString.add(hh); + } else { + int value = random.nextInt(100); + if (field.equals("k") && isTypeChange) { + record.set(idx, (long) value); + } else { + record.set(idx, value); + } + recordString.add(String.valueOf(value)); + } + } + records.add(record); + expectRecords.add(recordString); + } + + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, records, dt, hh); + } else { + writeRecordsToIceberg(icebergTable, format, records); + } + } + + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + System.out.println(); + assertThat( + paiResults.stream() + .map(row -> String.format("[%s]", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + expectRecords.stream().map(List::toString).collect(Collectors.toList())); + } + @Test public void testAllDataTypes() throws Exception { Schema iceAllTypesSchema = @@ -490,6 +742,31 @@ private Table createIcebergTable(boolean isPartitioned, Schema icebergSchema) { } } + private void writeInitialData(Table icebergTable, String format, boolean isPartitioned) + throws IOException { + List records1 = + Stream.of( + toIcebergRecord(1, 1, "20240101", "00"), + toIcebergRecord(2, 2, "20240101", "00")) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); + } else { + writeRecordsToIceberg(icebergTable, format, records1); + } + + List records2 = + Stream.of( + toIcebergRecord(1, 1, "20240101", "01"), + toIcebergRecord(2, 2, "20240101", "01")) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); + } else { + writeRecordsToIceberg(icebergTable, format, records2); + } + } + private GenericRecord toIcebergRecord(Schema icebergSchema, Object... values) { GenericRecord record = GenericRecord.create(icebergSchema); for (int i = 0; i < values.length; i++) { From 902bbd0a4cb5ae5d1b0cdaa6bc185e7f4e90a54e Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 14 Feb 2025 13:51:34 +0800 Subject: [PATCH 2/2] [core] allow define options for target table --- .../iceberg/migrate/IcebergMigrator.java | 19 ++++++++--- .../iceberg/migrate/IcebergMigrateTest.java | 33 ++++++++++++------- .../flink/utils/TableMigrationUtils.java | 3 +- 3 files changed, 39 insertions(+), 16 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java index e2f56c437538..4aff0ded5f1d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java @@ -18,6 +18,7 @@ package org.apache.paimon.iceberg.migrate; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; @@ -71,6 +72,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; /** migrate iceberg table to paimon table. */ @@ -82,6 +84,7 @@ public class IcebergMigrator implements Migrator { private final Catalog paimonCatalog; private final String paimonDatabaseName; private final String paimonTableName; + private final CoreOptions coreOptions; private final String icebergDatabaseName; private final String icebergTableName; @@ -104,10 +107,18 @@ public IcebergMigrator( String icebergDatabaseName, String icebergTableName, Options icebergOptions, - Integer parallelism) { + Integer parallelism, + Map options) { this.paimonCatalog = paimonCatalog; this.paimonDatabaseName = paimonDatabaseName; this.paimonTableName = paimonTableName; + this.coreOptions = new CoreOptions(options); + checkArgument( + coreOptions.bucket() == -1, + "Iceberg migrator only support unaware-bucket target table, bucket should be -1"); + checkArgument( + !options.containsKey(CoreOptions.PRIMARY_KEY.key()), + "Iceberg migrator does not support define primary key for target table."); this.icebergDatabaseName = icebergDatabaseName; this.icebergTableName = icebergTableName; @@ -277,7 +288,7 @@ public void renameTable(boolean ignoreIfNotExists) throws Exception { paimonCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists); } - public List icebergSchemasToPaimonSchemas(IcebergMetadata icebergMetadata) { + private List icebergSchemasToPaimonSchemas(IcebergMetadata icebergMetadata) { return icebergMetadata.schemas().stream() .map( icebergSchema -> { @@ -291,7 +302,7 @@ public List icebergSchemasToPaimonSchemas(IcebergMetadata icebergMe .collect(Collectors.toList()); } - public Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) { + private Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) { // get iceberg current partition spec int currentPartitionSpecId = icebergMetadata.defaultSpecId(); @@ -331,7 +342,7 @@ private void checkAndFilterDataFiles(List icebergDataFileMe } } - public long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO fileIO) { + private long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO fileIO) { try (DataFileStream dataFileStream = new DataFileStream<>( diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java index 455ecbd57ca5..40b0611481d9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java @@ -130,7 +130,8 @@ public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception { iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); icebergMigrator.executeMigrate(); icebergMigrator.renameTable(false); @@ -172,7 +173,8 @@ public void testMigrateAddAndDelete(boolean isPartitioned) throws Exception { iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); icebergMigrator.executeMigrate(); FileStoreTable paimonTable = @@ -228,7 +230,8 @@ public void testMigrateWithDeleteFile(boolean isPartitioned) throws Exception { iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); assertThatThrownBy(icebergMigrator::executeMigrate) .rootCause() @@ -270,7 +273,8 @@ public void testMigrateWithRandomIcebergData(boolean isPartitioned) throws Excep iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); icebergMigrator.executeMigrate(); FileStoreTable paimonTable = @@ -314,7 +318,8 @@ public void testDeleteColumn(boolean isPartitioned) throws Exception { iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); icebergMigrator.executeMigrate(); FileStoreTable paimonTable = @@ -363,7 +368,8 @@ public void testRenameColumn(boolean isPartitioned) throws Exception { iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); icebergMigrator.executeMigrate(); FileStoreTable paimonTable = @@ -412,7 +418,8 @@ public void testAddColumn(boolean isPartitioned) throws Exception { iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); icebergMigrator.executeMigrate(); FileStoreTable paimonTable = @@ -461,7 +468,8 @@ public void testReorderColumn(boolean isPartitioned) throws Exception { iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); icebergMigrator.executeMigrate(); FileStoreTable paimonTable = @@ -510,7 +518,8 @@ public void testUpdateColumn(boolean isPartitioned) throws Exception { iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); icebergMigrator.executeMigrate(); FileStoreTable paimonTable = @@ -636,7 +645,8 @@ public void testMigrateWithRandomIcebergEvolution(boolean isPartitioned) throws iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); icebergMigrator.executeMigrate(); FileStoreTable paimonTable = @@ -696,7 +706,8 @@ public void testAllDataTypes() throws Exception { iceDatabase, iceTable, new Options(icebergProperties), - 1); + 1, + Collections.emptyMap()); icebergMigrator.executeMigrate(); FileStoreTable paimonTable = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java index 4e7268c6f14e..5c0338f893a6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java @@ -80,7 +80,8 @@ public static Migrator getIcebergImporter( sourceDatabase, sourceTableName, icebergConf, - parallelism); + parallelism, + options); } public static List getImporters(