diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java index f58d3acafdb9..512d48170f4a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesForCloneOperator.java @@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; @@ -85,6 +86,20 @@ public void processElement(StreamRecord> streamRecord) th targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true); targetCatalog.createTable( targetIdentifier, newSchemaFromTableSchema(sourceTable.schema()), true); + FileStoreTable targetTable = (FileStoreTable) targetCatalog.getTable(targetIdentifier); + + // Make sure that latest schema file of source and target table are the same, + // so latest schema won't be overwritten in `CopyFileOperator` and the target table can + // always be retrieved from catalog. + SchemaManager sourceSchemaManager = sourceTable.schemaManager(); + SchemaManager targetSchemaManager = targetTable.schemaManager(); + long schemaId = sourceTable.schema().id(); + targetTable + .fileIO() + .copyFile( + sourceSchemaManager.toSchemaPath(schemaId), + targetSchemaManager.toSchemaPath(schemaId), + true); List result = toCloneFileInfos(