diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java index d171962becad..cb78c3c646b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java @@ -83,6 +83,9 @@ public static Content fromId(int id) { private final InternalMap lowerBounds; private final InternalMap upperBounds; + // only used for iceberg migrate + private long schemaId = 0; + IcebergDataFileMeta( Content content, String filePath, @@ -201,6 +204,15 @@ public InternalMap upperBounds() { return upperBounds; } + public long schemaId() { + return schemaId; + } + + public IcebergDataFileMeta withSchemaId(long schemaId) { + this.schemaId = schemaId; + return this; + } + public static RowType schema(RowType partitionType) { List fields = new ArrayList<>(); fields.add(new DataField(134, "content", DataTypes.INT().notNull())); diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java index 9e91fa2d18a8..e2f56c437538 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java @@ -43,19 +43,26 @@ import org.apache.paimon.migrate.Migrator; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.types.DataField; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -136,15 +143,28 @@ public IcebergMigrator( @Override public void executeMigrate() throws Exception { - Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata); + List paimonSchemas = icebergSchemasToPaimonSchemas(icebergMetadata); + Preconditions.checkArgument( + !paimonSchemas.isEmpty(), + "paimon schemas transformed from iceberg table is empty."); Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, paimonTableName); paimonCatalog.createDatabase(paimonDatabaseName, true); - paimonCatalog.createTable(paimonIdentifier, paimonSchema, false); + TableSchema firstSchema = paimonSchemas.get(0); + Preconditions.checkArgument(firstSchema.id() == 0, "Unexpected, first schema id is not 0."); + paimonCatalog.createTable(paimonIdentifier, firstSchema.toSchema(), false); try { FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); FileIO fileIO = paimonTable.fileIO(); + SchemaManager schemaManager = paimonTable.schemaManager(); + // commit all the iceberg schemas + for (int i = 1; i < paimonSchemas.size(); i++) { + LOG.info( + "commit new schema from iceberg, new schema id:{}", + paimonSchemas.get(i).id()); + schemaManager.commit(paimonSchemas.get(i)); + } IcebergManifestFile manifestFile = IcebergManifestFile.create(paimonTable, icebergMetaPathFactory); @@ -157,25 +177,36 @@ public void executeMigrate() throws Exception { // check manifest file with 'DELETE' kind checkAndFilterManifestFiles(icebergManifestFileMetas); - // get all live iceberg entries - List icebergEntries = - icebergManifestFileMetas.stream() - .flatMap(fileMeta -> manifestFile.read(fileMeta).stream()) - .filter(IcebergManifestEntry::isLive) - .collect(Collectors.toList()); - if (icebergEntries.isEmpty()) { + Map> icebergEntries = new HashMap<>(); + for (IcebergManifestFileMeta icebergManifestFileMeta : icebergManifestFileMetas) { + long schemaId = + getSchemaIdFromIcebergManifestFile( + new Path(icebergManifestFileMeta.manifestPath()), fileIO); + List entries = manifestFile.read(icebergManifestFileMeta); + icebergEntries + .computeIfAbsent(schemaId, v -> new ArrayList<>()) + .addAll( + entries.stream() + .filter(IcebergManifestEntry::isLive) + .collect(Collectors.toList())); + } + + List icebergDataFileMetas = new ArrayList<>(); + // write schema id to IcebergDataFileMeta + for (Map.Entry> kv : icebergEntries.entrySet()) { + icebergDataFileMetas.addAll( + kv.getValue().stream() + .map(entry -> entry.file().withSchemaId(kv.getKey())) + .collect(Collectors.toList())); + } + + if (icebergDataFileMetas.isEmpty()) { LOG.info( - "No live manifest entry in iceberg table for snapshot {}, iceberg table meta path is {}.", + "No live iceberg data files in iceberg table for snapshot {}, iceberg table meta path is {}.", icebergMetadata.currentSnapshotId(), icebergLatestMetadataLocation); return; } - - List icebergDataFileMetas = - icebergEntries.stream() - .map(IcebergManifestEntry::file) - .collect(Collectors.toList()); - // Again, check if delete File exists checkAndFilterDataFiles(icebergDataFileMetas); @@ -246,10 +277,21 @@ public void renameTable(boolean ignoreIfNotExists) throws Exception { paimonCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists); } - public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata) { - // get iceberg current schema - IcebergSchema icebergSchema = - icebergMetadata.schemas().get(icebergMetadata.currentSchemaId()); + public List icebergSchemasToPaimonSchemas(IcebergMetadata icebergMetadata) { + return icebergMetadata.schemas().stream() + .map( + icebergSchema -> { + LOG.info( + "Convert iceberg schema to paimon schema, iceberg schema id: {}", + icebergSchema.schemaId()); + return TableSchema.create( + icebergSchema.schemaId(), + icebergSchemaToPaimonSchema(icebergSchema)); + }) + .collect(Collectors.toList()); + } + + public Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) { // get iceberg current partition spec int currentPartitionSpecId = icebergMetadata.defaultSpecId(); @@ -289,6 +331,18 @@ private void checkAndFilterDataFiles(List icebergDataFileMe } } + public long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO fileIO) { + + try (DataFileStream dataFileStream = + new DataFileStream<>( + fileIO.newInputStream(manifestPath), new GenericDatumReader<>())) { + String schema = dataFileStream.getMetaString("schema"); + return JsonSerdeUtil.fromJson(schema, IcebergSchema.class).schemaId(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private static List construct( List icebergDataFileMetas, FileIO fileIO, @@ -318,7 +372,9 @@ private static DataFileMeta constructFileMeta( e); } String format = icebergDataFileMeta.fileFormat(); - return FileMetaUtils.constructFileMeta(format, status, fileIO, table, dir, rollback); + long schemaId = icebergDataFileMeta.schemaId(); + return FileMetaUtils.constructFileMeta( + format, status, fileIO, table, dir, rollback, schemaId); } private MigrateTask importUnPartitionedTable( diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 366f8afcfd38..405870d5fa03 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -129,6 +129,47 @@ public static DataFileMeta constructFileMeta( } } + public static DataFileMeta constructFileMeta( + String format, + FileStatus fileStatus, + FileIO fileIO, + Table table, + Path dir, + Map rollback, + long schemaId) { + + try { + RowType rowTypeWithSchemaId = + ((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType(); + SimpleColStatsCollector.Factory[] factories = + StatsCollectorFactories.createStatsFactories( + ((FileStoreTable) table).coreOptions(), + rowTypeWithSchemaId.getFieldNames()); + + SimpleStatsExtractor simpleStatsExtractor = + FileFormat.fromIdentifier( + format, + ((FileStoreTable) table).coreOptions().toConfiguration()) + .createStatsExtractor(rowTypeWithSchemaId, factories) + .orElseThrow( + () -> + new RuntimeException( + "Can't get table stats extractor for format " + + format)); + Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, format, rollback); + return constructFileMeta( + newPath.getName(), + fileStatus.getLen(), + newPath, + simpleStatsExtractor, + fileIO, + table, + schemaId); + } catch (IOException e) { + throw new RuntimeException("error when construct file meta", e); + } + } + // -----------------------------private method--------------------------------------------- private static Path renameFile( @@ -152,7 +193,29 @@ private static DataFileMeta constructFileMeta( FileIO fileIO, Table table) throws IOException { - SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(table.rowType()); + return constructFileMeta( + fileName, + fileSize, + path, + simpleStatsExtractor, + fileIO, + table, + ((FileStoreTable) table).schema().id()); + } + + private static DataFileMeta constructFileMeta( + String fileName, + long fileSize, + Path path, + SimpleStatsExtractor simpleStatsExtractor, + FileIO fileIO, + Table table, + long schemaId) + throws IOException { + RowType rowTypeWithSchemaId = + ((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType(); + + SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(rowTypeWithSchemaId); Pair fileInfo = simpleStatsExtractor.extractWithFileInfo(fileIO, path); @@ -165,7 +228,7 @@ private static DataFileMeta constructFileMeta( stats, 0, 0, - ((FileStoreTable) table).schema().id(), + schemaId, Collections.emptyList(), null, FileSource.APPEND, diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 753bc34d95ef..d15312fce1f7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -769,7 +769,7 @@ protected void updateLastColumn(List newFields, String fieldName) } @VisibleForTesting - boolean commit(TableSchema newSchema) throws Exception { + public boolean commit(TableSchema newSchema) throws Exception { SchemaValidation.validateTableSchema(newSchema); SchemaValidation.validateFallbackBranch(this, newSchema); Path schemaPath = toSchemaPath(newSchema.id()); diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java index aadaca0c3854..455ecbd57ca5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java @@ -64,8 +64,10 @@ import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -118,27 +120,7 @@ public void beforeEach() throws Exception { public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception { Table icebergTable = createIcebergTable(isPartitioned); String format = "parquet"; - List records1 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "00"), - toIcebergRecord(2, 2, "20240101", "00")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); - } else { - writeRecordsToIceberg(icebergTable, format, records1); - } - - List records2 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "01"), - toIcebergRecord(2, 2, "20240101", "01")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); - } else { - writeRecordsToIceberg(icebergTable, format, records2); - } + writeInitialData(icebergTable, format, isPartitioned); IcebergMigrator icebergMigrator = new IcebergMigrator( @@ -160,8 +142,11 @@ public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception { .map(row -> String.format("Record(%s)", row)) .collect(Collectors.toList())) .hasSameElementsAs( - Stream.concat(records1.stream(), records2.stream()) - .map(GenericRecord::toString) + Stream.of( + "Record(1, 1, 20240101, 00)", + "Record(2, 2, 20240101, 00)", + "Record(1, 1, 20240101, 01)", + "Record(2, 2, 20240101, 01)") .collect(Collectors.toList())); // verify iceberg table has been deleted @@ -173,27 +158,7 @@ public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception { public void testMigrateAddAndDelete(boolean isPartitioned) throws Exception { Table icebergTable = createIcebergTable(isPartitioned); String format = "parquet"; - List records1 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "00"), - toIcebergRecord(2, 2, "20240101", "00")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); - } else { - writeRecordsToIceberg(icebergTable, format, records1); - } - - List records2 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "01"), - toIcebergRecord(2, 2, "20240101", "01")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); - } else { - writeRecordsToIceberg(icebergTable, format, records2); - } + writeInitialData(icebergTable, format, isPartitioned); // the file written with records2 will be deleted and generate a delete manifest entry, not // a delete file @@ -218,8 +183,7 @@ public void testMigrateAddAndDelete(boolean isPartitioned) throws Exception { .map(row -> String.format("Record(%s)", row)) .collect(Collectors.toList())) .hasSameElementsAs( - records2.stream() - .map(GenericRecord::toString) + Stream.of("Record(1, 1, 20240101, 01)", "Record(2, 2, 20240101, 01)") .collect(Collectors.toList())); } @@ -324,45 +288,115 @@ public void testMigrateWithRandomIcebergData(boolean isPartitioned) throws Excep @ParameterizedTest(name = "isPartitioned = {0}") @ValueSource(booleans = {true, false}) - public void testMigrateWithSchemaEvolution(boolean isPartitioned) throws Exception { + public void testDeleteColumn(boolean isPartitioned) throws Exception { Table icebergTable = createIcebergTable(isPartitioned); String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); - // write base data - List records1 = + icebergTable.updateSchema().deleteColumn("v").commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = Stream.of( - toIcebergRecord(1, 1, "20240101", "00"), - toIcebergRecord(2, 2, "20240101", "00")) + toIcebergRecord(3, "20240101", "00", newIceSchema), + toIcebergRecord(4, "20240101", "00", newIceSchema)) .collect(Collectors.toList()); if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); } else { - writeRecordsToIceberg(icebergTable, format, records1); + writeRecordsToIceberg(icebergTable, format, addedRecords); } - List records2 = + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 20240101, 00)", + "Record(2, 20240101, 00)", + "Record(1, 20240101, 01)", + "Record(2, 20240101, 01)", + "Record(3, 20240101, 00)", + "Record(4, 20240101, 00)") + .collect(Collectors.toList())); + } + + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testRenameColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().renameColumn("v", "v2").commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = Stream.of( - toIcebergRecord(1, 1, "20240101", "01"), - toIcebergRecord(2, 2, "20240101", "01")) + toIcebergRecord(newIceSchema, 3, 3, "20240101", "00"), + toIcebergRecord(newIceSchema, 4, 4, "20240101", "00")) .collect(Collectors.toList()); if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); } else { - writeRecordsToIceberg(icebergTable, format, records2); + writeRecordsToIceberg(icebergTable, format, addedRecords); } - // TODO: currently only support schema evolution of deleting columns - testDeleteColumn(icebergTable, format, isPartitioned); + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 1, 20240101, 00)", + "Record(2, 2, 20240101, 00)", + "Record(1, 1, 20240101, 01)", + "Record(2, 2, 20240101, 01)", + "Record(3, 3, 20240101, 00)", + "Record(4, 4, 20240101, 00)") + .collect(Collectors.toList())); } - private void testDeleteColumn(Table icebergTable, String format, boolean isPartitioned) - throws Exception { - icebergTable.updateSchema().deleteColumn("v").commit(); + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testAddColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().addColumn("v2", Types.IntegerType.get()).commit(); Schema newIceSchema = icebergTable.schema(); List addedRecords = Stream.of( - toIcebergRecord(3, "20240101", "00", newIceSchema), - toIcebergRecord(4, "20240101", "00", newIceSchema)) + toIcebergRecord(newIceSchema, 3, 3, "20240101", "00", 3), + toIcebergRecord(newIceSchema, 4, 4, "20240101", "00", 4)) .collect(Collectors.toList()); if (isPartitioned) { writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); @@ -390,15 +424,233 @@ private void testDeleteColumn(Table icebergTable, String format, boolean isParti .collect(Collectors.toList())) .hasSameElementsAs( Stream.of( - "Record(1, 20240101, 00)", - "Record(2, 20240101, 00)", - "Record(1, 20240101, 01)", - "Record(2, 20240101, 01)", - "Record(3, 20240101, 00)", - "Record(4, 20240101, 00)") + "Record(1, 1, 20240101, 00, NULL)", + "Record(2, 2, 20240101, 00, NULL)", + "Record(1, 1, 20240101, 01, NULL)", + "Record(2, 2, 20240101, 01, NULL)", + "Record(3, 3, 20240101, 00, 3)", + "Record(4, 4, 20240101, 00, 4)") + .collect(Collectors.toList())); + } + + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testReorderColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().moveAfter("v", "hh").commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = + Stream.of( + toIcebergRecord(newIceSchema, 3, "20240101", "00", 3), + toIcebergRecord(newIceSchema, 4, "20240101", "00", 4)) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); + } else { + writeRecordsToIceberg(icebergTable, format, addedRecords); + } + + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 20240101, 00, 1)", + "Record(2, 20240101, 00, 2)", + "Record(1, 20240101, 01, 1)", + "Record(2, 20240101, 01, 2)", + "Record(3, 20240101, 00, 3)", + "Record(4, 20240101, 00, 4)") .collect(Collectors.toList())); } + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testUpdateColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().updateColumn("v", Types.LongType.get()).commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = + Stream.of( + toIcebergRecord(newIceSchema, 3, 3L, "20240101", "00"), + toIcebergRecord(newIceSchema, 4, 3L, "20240101", "00")) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); + } else { + writeRecordsToIceberg(icebergTable, format, addedRecords); + } + + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 1, 20240101, 00)", + "Record(2, 2, 20240101, 00)", + "Record(1, 1, 20240101, 01)", + "Record(2, 2, 20240101, 01)", + "Record(3, 3, 20240101, 00)", + "Record(4, 3, 20240101, 00)") + .collect(Collectors.toList())); + } + + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testMigrateWithRandomIcebergEvolution(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + icebergTable.updateSchema().addColumn("v2", Types.IntegerType.get()).commit(); + String format = "parquet"; + List index = new LinkedList<>(Arrays.asList("k", "v", "dt", "hh", "v2")); + + int numRounds = 20; + int numRecords = 10; + List ops = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5)); + ThreadLocalRandom random = ThreadLocalRandom.current(); + boolean isTypeChange = false; + List> expectRecords = new ArrayList<>(); + + for (int i = 0; i < numRounds; i++) { + List records = new ArrayList<>(); + String dt = Integer.toString(random.nextInt(20240101, 20240104)); + String hh = Integer.toString(random.nextInt(3)); + + if ((i + 1) % 4 == 0 && !ops.isEmpty()) { + switch (ops.remove(random.nextInt(ops.size()))) { + case 1: + icebergTable + .updateSchema() + .addColumn("v3", Types.IntegerType.get()) + .commit(); + for (List record : expectRecords) { + record.add("NULL"); + } + index.add("v3"); + break; + case 2: + icebergTable.updateSchema().renameColumn("v", "vv").commit(); + break; + case 3: + icebergTable.updateSchema().deleteColumn("v2").commit(); + int v2Idx = index.indexOf("v2"); + for (List record : expectRecords) { + record.remove(v2Idx); + } + index.remove(v2Idx); + break; + case 4: + icebergTable.updateSchema().moveAfter("k", "hh").commit(); + int kIdx = index.indexOf("k"); + int hhIdx = index.indexOf("hh"); + for (List record : expectRecords) { + String k = record.remove(kIdx); + record.add(hhIdx, k); + } + index.remove(kIdx); + index.add(hhIdx, "k"); + break; + case 5: + icebergTable + .updateSchema() + .updateColumn("k", Types.LongType.get()) + .commit(); + isTypeChange = true; + break; + default: + throw new IllegalStateException("Unknown operation"); + } + } + for (int j = 0; j < numRecords; j++) { + List recordString = new ArrayList<>(); + GenericRecord record = GenericRecord.create(icebergTable.schema()); + for (int idx = 0; idx < index.size(); idx++) { + String field = index.get(idx); + if (field.equals("dt")) { + record.set(idx, dt); + recordString.add(dt); + } else if (field.equals("hh")) { + record.set(idx, hh); + recordString.add(hh); + } else { + int value = random.nextInt(100); + if (field.equals("k") && isTypeChange) { + record.set(idx, (long) value); + } else { + record.set(idx, value); + } + recordString.add(String.valueOf(value)); + } + } + records.add(record); + expectRecords.add(recordString); + } + + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, records, dt, hh); + } else { + writeRecordsToIceberg(icebergTable, format, records); + } + } + + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + System.out.println(); + assertThat( + paiResults.stream() + .map(row -> String.format("[%s]", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + expectRecords.stream().map(List::toString).collect(Collectors.toList())); + } + @Test public void testAllDataTypes() throws Exception { Schema iceAllTypesSchema = @@ -490,6 +742,31 @@ private Table createIcebergTable(boolean isPartitioned, Schema icebergSchema) { } } + private void writeInitialData(Table icebergTable, String format, boolean isPartitioned) + throws IOException { + List records1 = + Stream.of( + toIcebergRecord(1, 1, "20240101", "00"), + toIcebergRecord(2, 2, "20240101", "00")) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); + } else { + writeRecordsToIceberg(icebergTable, format, records1); + } + + List records2 = + Stream.of( + toIcebergRecord(1, 1, "20240101", "01"), + toIcebergRecord(2, 2, "20240101", "01")) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); + } else { + writeRecordsToIceberg(icebergTable, format, records2); + } + } + private GenericRecord toIcebergRecord(Schema icebergSchema, Object... values) { GenericRecord record = GenericRecord.create(icebergSchema); for (int i = 0; i < values.length; i++) { diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java new file mode 100644 index 000000000000..0402d3e8961b --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.utils.ParameterUtils; + +import org.apache.flink.table.procedure.ProcedureContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Migrate procedure to migrate iceberg table to paimon table. */ +public class MigrateIcebergTableProcedure extends ProcedureBase { + + private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class); + + private static final String PAIMON_SUFFIX = "_paimon_"; + + @Override + public String identifier() { + return "migrate_iceberg_table"; + } + + public String[] call( + ProcedureContext procedureContext, String sourceTablePath, String icebergProperties) + throws Exception { + + return call(procedureContext, sourceTablePath, icebergProperties, ""); + } + + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties) + throws Exception { + + return call( + procedureContext, + sourceTablePath, + icebergProperties, + properties, + Runtime.getRuntime().availableProcessors()); + } + + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties, + Integer parallelism) + throws Exception { + String targetTablePath = sourceTablePath + PAIMON_SUFFIX; + + Identifier sourceTableId = Identifier.fromString(sourceTablePath); + Identifier targetTableId = Identifier.fromString(targetTablePath); + + Migrator migrator = + TableMigrationUtils.getIcebergImporter( + catalog, + sourceTableId.getDatabaseName(), + sourceTableId.getObjectName(), + targetTableId.getDatabaseName(), + targetTableId.getObjectName(), + parallelism, + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); + + migrator.renameTable(false); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 196528d31c78..8778b9d5e187 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.procedure.ProcedureContext; @@ -50,25 +51,13 @@ public String[] call( String sourceTablePath, String properties) throws Exception { - String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; - Identifier sourceTableId = Identifier.fromString(sourceTablePath); - Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); - - TableMigrationUtils.getImporter( - connector, - catalog, - sourceTableId.getDatabaseName(), - sourceTableId.getObjectName(), - targetTableId.getDatabaseName(), - targetTableId.getObjectName(), - Runtime.getRuntime().availableProcessors(), - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); - - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); - return new String[] {"Success"}; + return call( + procedureContext, + connector, + sourceTablePath, + properties, + Runtime.getRuntime().availableProcessors()); } public String[] call( @@ -78,12 +67,13 @@ public String[] call( String properties, Integer parallelism) throws Exception { - String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; + String targetTablePath = sourceTablePath + PAIMON_SUFFIX; Identifier sourceTableId = Identifier.fromString(sourceTablePath); - Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); + Identifier targetTableId = Identifier.fromString(targetTablePath); - TableMigrationUtils.getImporter( + Migrator migrator = + TableMigrationUtils.getImporter( connector, catalog, sourceTableId.getDatabaseName(), @@ -91,11 +81,11 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), parallelism, - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); + ParameterUtils.parseCommaSeparatedKeyValues(properties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); + migrator.renameTable(false); return new String[] {"Success"}; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java new file mode 100644 index 000000000000..1b9fcb46a9e9 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure; + +import org.apache.flink.table.procedure.DefaultProcedureContext; + +import java.util.Map; + +/** Migrate from iceberg table to paimon table. */ +public class MigrateIcebergTableAction extends ActionBase { + + private final String sourceTableFullName; + private final String tableProperties; + private final Integer parallelism; + + private final String icebergProperties; + + public MigrateIcebergTableAction( + String sourceTableFullName, + Map catalogConfig, + String icebergProperties, + String tableProperties, + Integer parallelism) { + super(catalogConfig); + this.sourceTableFullName = sourceTableFullName; + this.tableProperties = tableProperties; + this.parallelism = parallelism; + this.icebergProperties = icebergProperties; + } + + @Override + public void run() throws Exception { + MigrateIcebergTableProcedure migrateIcebergTableProcedure = + new MigrateIcebergTableProcedure(); + migrateIcebergTableProcedure.withCatalog(catalog); + migrateIcebergTableProcedure.call( + new DefaultProcedureContext(env), + sourceTableFullName, + icebergProperties, + tableProperties, + parallelism); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java new file mode 100644 index 000000000000..c85559d66b41 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; +import java.util.Optional; + +/** Action Factory for {@link MigrateIcebergTableAction}. */ +public class MigrateIcebergTableActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "migrate_iceberg_table"; + + private static final String OPTIONS = "options"; + private static final String PARALLELISM = "parallelism"; + + private static final String ICEBERG_OPTIONS = "iceberg_options"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + + String sourceTable = params.get(TABLE); + Map catalogConfig = catalogConfigMap(params); + String tableConf = params.get(OPTIONS); + Integer parallelism = + params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM)); + + String icebergOptions = params.get(ICEBERG_OPTIONS); + + MigrateIcebergTableAction migrateIcebergTableAction = + new MigrateIcebergTableAction( + sourceTable, catalogConfig, icebergOptions, tableConf, parallelism); + return Optional.of(migrateIcebergTableAction); + } + + @Override + public void printHelp() { + System.out.println( + "Action \"migrate_iceberg_table\" runs a migrating job from iceberg to paimon."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " migrate_iceberg_table" + + "--table " + + "--iceberg_options =[,=,...]" + + "[--catalog_conf ==,=,...]"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java new file mode 100644 index 000000000000..f43d29ed4f17 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.utils.ParameterUtils; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Migrate procedure to migrate iceberg table to paimon table. */ +public class MigrateIcebergTableProcedure extends ProcedureBase { + private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class); + + private static final String PAIMON_SUFFIX = "_paimon_"; + + @Override + public String identifier() { + return "migrate_iceberg_table"; + } + + @ProcedureHint( + argument = { + @ArgumentHint(name = "source_table", type = @DataTypeHint("STRING")), + @ArgumentHint( + name = "iceberg_options", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint( + name = "parallelism", + type = @DataTypeHint("Integer"), + isOptional = true) + }) + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties, + Integer parallelism) + throws Exception { + properties = notnull(properties); + icebergProperties = notnull(icebergProperties); + + String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; + + Identifier sourceTableId = Identifier.fromString(sourceTablePath); + Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); + + Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism; + + Migrator migrator = + TableMigrationUtils.getIcebergImporter( + catalog, + sourceTableId.getDatabaseName(), + sourceTableId.getObjectName(), + targetTableId.getDatabaseName(), + targetTableId.getObjectName(), + p, + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); + + migrator.renameTable(false); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index fff05a1a8555..32a2a16dc51d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.annotation.ArgumentHint; @@ -67,7 +68,8 @@ public String[] call( Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism; - TableMigrationUtils.getImporter( + Migrator migrator = + TableMigrationUtils.getImporter( connector, catalog, sourceTableId.getDatabaseName(), @@ -75,11 +77,11 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), p, - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); + ParameterUtils.parseCommaSeparatedKeyValues(properties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); + migrator.renameTable(false); return new String[] {"Success"}; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java index b59c3592a97d..4e7268c6f14e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java @@ -22,7 +22,9 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.hive.HiveCatalog; import org.apache.paimon.hive.migrate.HiveMigrator; +import org.apache.paimon.iceberg.migrate.IcebergMigrator; import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.options.Options; import java.util.List; import java.util.Map; @@ -60,6 +62,27 @@ public static Migrator getImporter( } } + public static Migrator getIcebergImporter( + Catalog catalog, + String sourceDatabase, + String sourceTableName, + String targetDatabase, + String targetTableName, + Integer parallelism, + Map options, + Map icebergOptions) { + + Options icebergConf = new Options(icebergOptions); + return new IcebergMigrator( + catalog, + targetDatabase, + targetTableName, + sourceDatabase, + sourceTableName, + icebergConf, + parallelism); + } + public static List getImporters( String connector, Catalog catalog, diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 6f6becf85fc7..efaa25627d69 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -85,4 +85,5 @@ org.apache.paimon.flink.procedure.CloneProcedure org.apache.paimon.flink.procedure.CompactManifestProcedure org.apache.paimon.flink.procedure.RefreshObjectTableProcedure org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure -org.apache.paimon.flink.procedure.ClearConsumersProcedure \ No newline at end of file +org.apache.paimon.flink.procedure.ClearConsumersProcedure +org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java new file mode 100644 index 000000000000..3c0d7da024a0 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.migrate; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.hive.pool.CachedClientPool; +import org.apache.paimon.iceberg.IcebergOptions; +import org.apache.paimon.iceberg.metadata.IcebergMetadata; +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Get iceberg table latest snapshot metadata in hive. */ +public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { + private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrateHiveMetadata.class); + + public static final String TABLE_TYPE_PROP = "table_type"; + public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg"; + private static final String ICEBERG_METADATA_LOCATION = "metadata_location"; + + private FileIO fileIO; + private final Options icebergOptions; + private final Identifier icebergIdentifier; + + private final ClientPool clients; + + private String metadataLocation = null; + + private IcebergMetadata icebergMetadata; + + public IcebergMigrateHiveMetadata(Identifier icebergIdentifier, Options icebergOptions) { + + this.icebergIdentifier = icebergIdentifier; + this.icebergOptions = icebergOptions; + + String uri = icebergOptions.get(IcebergOptions.URI); + String hiveConfDir = icebergOptions.get(IcebergOptions.HIVE_CONF_DIR); + String hadoopConfDir = icebergOptions.get(IcebergOptions.HADOOP_CONF_DIR); + Configuration hadoopConf = new Configuration(); + hadoopConf.setClassLoader(IcebergMigrateHiveMetadata.class.getClassLoader()); + HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, hadoopConf); + + icebergOptions.toMap().forEach(hiveConf::set); + if (uri != null) { + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); + } + + if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) { + LOG.error( + "Can't find hive metastore uri to connect: " + + "either set {} in iceberg options or set hive.metastore.uris " + + "in hive-site.xml or hadoop configurations. " + + "Will use empty metastore uris, which means we may use a embedded metastore. " + + "Please make sure hive metastore uri for iceberg table is correctly set as expected.", + IcebergOptions.URI.key()); + } + + this.clients = + new CachedClientPool( + hiveConf, + icebergOptions, + icebergOptions.getString(IcebergOptions.HIVE_CLIENT_CLASS)); + } + + @Override + public IcebergMetadata icebergMetadata() { + try { + boolean isExist = tableExists(icebergIdentifier); + if (!isExist) { + throw new RuntimeException( + String.format( + "iceberg table %s is not existed in hive metastore", + icebergIdentifier)); + } + Table icebergHiveTable = + clients.run( + client -> + client.getTable( + icebergIdentifier.getDatabaseName(), + icebergIdentifier.getTableName())); + // check whether it is an iceberg table + String tableType = icebergHiveTable.getParameters().get(TABLE_TYPE_PROP); + Preconditions.checkArgument( + tableType != null && tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE), + "not an iceberg table: %s (table-type=%s)", + icebergIdentifier.toString(), + tableType); + + metadataLocation = icebergHiveTable.getParameters().get(ICEBERG_METADATA_LOCATION); + LOG.info("iceberg latest metadata location: {}", metadataLocation); + + fileIO = FileIO.get(new Path(metadataLocation), CatalogContext.create(icebergOptions)); + + icebergMetadata = IcebergMetadata.fromPath(fileIO, new Path(metadataLocation)); + return icebergMetadata; + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to read Iceberg metadata from path %s", metadataLocation), + e); + } + } + + @Override + public String icebergLatestMetadataLocation() { + return metadataLocation; + } + + @Override + public void deleteOriginTable() { + LOG.info("Iceberg table in hive to be deleted:{}", icebergIdentifier.toString()); + try { + clients.run( + client -> { + client.dropTable( + icebergIdentifier.getDatabaseName(), + icebergIdentifier.getTableName(), + true, + true); + return null; + }); + + // iceberg table in hive is external table, client.dropTable only deletes the metadata + // of iceberg table in hive, so we manually delete the data files + Path icebergTablePath = new Path(icebergMetadata.location()); + + if (fileIO.exists(icebergTablePath) && fileIO.isDir(icebergTablePath)) { + fileIO.deleteDirectoryQuietly(icebergTablePath); + } + } catch (Exception e) { + LOG.warn("exception occurred when deleting origin table", e); + } + } + + private boolean tableExists(Identifier identifier) throws Exception { + return clients.run( + client -> + client.tableExists( + identifier.getDatabaseName(), identifier.getTableName())); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java new file mode 100644 index 000000000000..0a539cdec2d2 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.migrate; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.iceberg.IcebergOptions; +import org.apache.paimon.options.Options; + +/** Factory to create {@link IcebergMigrateHiveMetadata}. */ +public class IcebergMigrateHiveMetadataFactory implements IcebergMigrateMetadataFactory { + @Override + public String identifier() { + return IcebergOptions.StorageType.HIVE_CATALOG.toString() + "_migrate"; + } + + @Override + public IcebergMigrateHiveMetadata create(Identifier icebergIdentifier, Options icebergOptions) { + return new IcebergMigrateHiveMetadata(icebergIdentifier, icebergOptions); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 26f0944d916e..608f034659ca 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -16,3 +16,4 @@ org.apache.paimon.hive.HiveCatalogFactory org.apache.paimon.hive.HiveCatalogLockFactory org.apache.paimon.iceberg.IcebergHiveMetadataCommitterFactory +org.apache.paimon.iceberg.migrate.IcebergMigrateHiveMetadataFactory diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml b/paimon-hive/paimon-hive-connector-common/pom.xml index 397dfc942185..a79f2002ea24 100644 --- a/paimon-hive/paimon-hive-connector-common/pom.xml +++ b/paimon-hive/paimon-hive-connector-common/pom.xml @@ -562,6 +562,25 @@ under the License. + + org.apache.iceberg + iceberg-flink-${iceberg.flink.version} + ${iceberg.version} + test + + + org.apache.orc + orc-core + + + + + org.apache.flink + flink-metrics-dropwizard + ${iceberg.flink.dropwizard.version} + test + + junit junit diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java new file mode 100644 index 000000000000..1875b08eba22 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.procedure; + +import org.apache.paimon.flink.action.ActionITCaseBase; +import org.apache.paimon.flink.action.MigrateIcebergTableAction; +import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure; +import org.apache.paimon.hive.TestHiveMetastore; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.types.Row; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +/** Tests for {@link MigrateIcebergTableProcedure}. */ +public class MigrateIcebergTableProcedureITCase extends ActionITCaseBase { + + private static final Logger LOG = + LoggerFactory.getLogger(MigrateIcebergTableProcedureITCase.class); + + private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); + + private static final int PORT = 9087; + + @TempDir java.nio.file.Path iceTempDir; + @TempDir java.nio.file.Path paiTempDir; + + @BeforeEach + public void beforeEach() { + TEST_HIVE_METASTORE.start(PORT); + } + + @AfterEach + public void afterEach() throws Exception { + TEST_HIVE_METASTORE.stop(); + } + + @Test + public void testMigrateIcebergTableProcedure() throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + boolean isPartitioned = random.nextBoolean(); + boolean icebergIsHive = random.nextBoolean(); + boolean paimonIsHive = random.nextBoolean(); + boolean isNamedArgument = random.nextBoolean(); + + // Logging the random arguments for debugging + LOG.info( + "isPartitioned:{}, icebergIsHive:{}, paimonIsHive:{}, isNamedArgument:{}", + isPartitioned, + icebergIsHive, + paimonIsHive, + isNamedArgument); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(icebergIsHive)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + tEnv.executeSql(paimonCatalogDdl(paimonIsHive)); + tEnv.executeSql("USE CATALOG my_paimon"); + + String icebergOptions = + icebergIsHive + ? "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT + : "metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir; + if (isNamedArgument) { + tEnv.executeSql( + String.format( + "CALL sys.migrate_iceberg_table(source_table => 'default.%s', " + + "iceberg_options => '%s')", + icebergTable, icebergOptions)) + .await(); + } else { + tEnv.executeSql( + String.format( + "CALL sys.migrate_iceberg_table('default.%s','%s')", + icebergTable, icebergOptions)) + .await(); + } + + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql( + String.format( + "SELECT * FROM `default`.`%s`", + icebergTable)) + .collect())); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testMigrateIcebergTableAction(boolean isPartitioned) throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(true)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + String icebergOptions = + "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT; + + Map catalogConf = new HashMap<>(); + catalogConf.put("warehouse", paiTempDir.toString()); + catalogConf.put("metastore", "hive"); + catalogConf.put("uri", "thrift://localhost:" + PORT); + catalogConf.put("cache-enabled", "false"); + + MigrateIcebergTableAction migrateIcebergTableAction = + new MigrateIcebergTableAction( + "default." + icebergTable, catalogConf, icebergOptions, "", 6); + migrateIcebergTableAction.run(); + + tEnv.executeSql(paimonCatalogDdl(true)); + tEnv.executeSql("USE CATALOG my_paimon"); + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql( + String.format( + "SELECT * FROM `my_paimon`.`default`.`%s`", + icebergTable)) + .collect())); + } + + private String icebergCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false')", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hadoop'," + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + iceTempDir); + } + + private String paimonCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_paimon WITH " + + "( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_paimon WITH ('type' = 'paimon', 'warehouse' = '%s')", + paiTempDir); + } +} diff --git a/paimon-hive/pom.xml b/paimon-hive/pom.xml index c97aceb1b85c..92f32d1336a6 100644 --- a/paimon-hive/pom.xml +++ b/paimon-hive/pom.xml @@ -50,6 +50,7 @@ under the License. 0.9.8 1.12.319 1.19 + 1.19.0