From 681c4f5a5179445f8ac858a71fd204d427b984ad Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 16 Jan 2025 17:50:40 +0800 Subject: [PATCH] [test] Fix the unstable testCloneWithSchemaEvolution --- .../flink/clone/PickFilesForCloneOperator.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java index f58d3acafdb9..512d48170f4a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java @@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; @@ -85,6 +86,20 @@ public void processElement(StreamRecord> streamRecord) th targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true); targetCatalog.createTable( targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true); + FileStoreTable targetTable = (FileStoreTable) targetCatalog.getTable(targetIdentifier); + + // Make sure that latest schema file of source and target table are the same, + // so latest schema won't be overwritten in `CopyFileOperator` and the target table can + // always be retrieved from catalog. + SchemaManager sourceSchemaManager = sourceTable.schemaManager(); + SchemaManager targetSchemaManager = targetTable.schemaManager(); + long schemaId = sourceTable.schema().id(); + targetTable + .fileIO() + .copyFile( + sourceSchemaManager.toSchemaPath(schemaId), + targetSchemaManager.toSchemaPath(schemaId), + true); List result = toCloneFileInfos(