diff --git a/paimon-flink/paimon-flink-cdc/pom.xml b/paimon-flink/paimon-flink-cdc/pom.xml index 6c73b8ebbf70..9499b39740dd 100644 --- a/paimon-flink/paimon-flink-cdc/pom.xml +++ b/paimon-flink/paimon-flink-cdc/pom.xml @@ -35,8 +35,8 @@ under the License. 1.20.1 - 3.1.1 - 3.1.1 + 3.5.0 + 3.5.0 1.11.4 2.2.0 2.9.0 diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java new file mode 100644 index 000000000000..db32c24b043a --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.base.relational.connection; + +import com.zaxxer.hikari.HikariDataSource; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig; +import org.apache.flink.util.FlinkRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Copied from Flink + * CDC 3.5.0 resemblance. Modified method {@link JdbcConnectionPools#clear()} at line 92 ~ 94. + */ +public class JdbcConnectionPools implements ConnectionPools { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionPools.class); + + private static JdbcConnectionPools instance; + private final Map pools = new HashMap<>(); + private static final Map POOL_FACTORY_MAP = new HashMap<>(); + + private JdbcConnectionPools() {} + + public static synchronized JdbcConnectionPools getInstance( + JdbcConnectionPoolFactory jdbcConnectionPoolFactory) { + if (instance == null) { + instance = new JdbcConnectionPools(); + } + POOL_FACTORY_MAP.put( + jdbcConnectionPoolFactory.getClass().getName(), jdbcConnectionPoolFactory); + return instance; + } + + @Override + public HikariDataSource getOrCreateConnectionPool( + ConnectionPoolId poolId, JdbcSourceConfig sourceConfig) { + synchronized (pools) { + if (!pools.containsKey(poolId)) { + LOG.info("Create and register connection pool {}", poolId); + JdbcConnectionPoolFactory jdbcConnectionPoolFactory = + POOL_FACTORY_MAP.get(poolId.getDataSourcePoolFactoryIdentifier()); + if (jdbcConnectionPoolFactory == null) { + throw new FlinkRuntimeException( + String.format( + "Pool factory identifier is required for connection pool, but unknown pool factory identifier %s found.", + poolId.getDataSourcePoolFactoryIdentifier())); + } + pools.put(poolId, jdbcConnectionPoolFactory.createPooledDataSource(sourceConfig)); + } + return pools.get(poolId); + } + } + + /** this method is only supported for test. */ + @VisibleForTesting + public String getJdbcUrl( + JdbcSourceConfig sourceConfig, String dataSourcePoolFactoryIdentifier) { + JdbcConnectionPoolFactory jdbcConnectionPoolFactory = + POOL_FACTORY_MAP.get(dataSourcePoolFactoryIdentifier); + if (jdbcConnectionPoolFactory == null) { + throw new FlinkRuntimeException( + String.format( + "Pool factory identifier is required for connection pools, but unknown pool factory identifier %s found.", + dataSourcePoolFactoryIdentifier)); + } + return jdbcConnectionPoolFactory.getJdbcUrl(sourceConfig); + } + + public void clear() throws IOException { + // See org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools#clear. + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java new file mode 100644 index 000000000000..5fdafcec0e58 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.connection; + +import com.zaxxer.hikari.HikariDataSource; +import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Copied from Flink + * CDC 3.5.0 resemblance. Modified method {@link JdbcConnectionPools#clear()} at line 60 ~ 66. + */ +public class JdbcConnectionPools implements ConnectionPools { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcConnectionPools.class); + + private static final JdbcConnectionPools INSTANCE = new JdbcConnectionPools(); + private final Map pools = new HashMap<>(); + + private JdbcConnectionPools() {} + + public static JdbcConnectionPools getInstance() { + return INSTANCE; + } + + @Override + public HikariDataSource getOrCreateConnectionPool( + ConnectionPoolId poolId, MySqlSourceConfig sourceConfig) { + synchronized (pools) { + if (!pools.containsKey(poolId)) { + LOG.info("Create and register connection pool {}", poolId); + pools.put(poolId, PooledDataSourceFactory.createPooledDataSource(sourceConfig)); + } + return pools.get(poolId); + } + } + + public void clear() throws IOException { + // Intentionally no-op. + // + // Flink CDC 3.2+ automatically clears connection pools to avoid connection leakage. + // However, this might accidentally affect two Paimon Action jobs running in one single mini + // cluster. We overwrite this class to get the same behaviors in CDC 3.1.1. + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java index 5107ba146bfb..9b6c0188367e 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java @@ -186,7 +186,7 @@ public static JdbcIncrementalSource buildPostgresSource( customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); CdcDebeziumDeserializationSchema schema = new CdcDebeziumDeserializationSchema(true, customConverterConfigs); - return sourceBuilder.deserializer(schema).includeSchemaChanges(true).build(); + return sourceBuilder.deserializer(schema).build(); } public static void registerJdbcDriver() { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java new file mode 100644 index 000000000000..6a5010b7728f --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplier.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.schema; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalog; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.DropTableEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; +import org.apache.flink.table.factories.FactoryUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument; +import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; + +/** A {@code MetadataApplier} that applies schema changes to Paimon table. */ +public class PaimonMetadataApplier implements MetadataApplier { + + private static final Logger LOG = LoggerFactory.getLogger(PaimonMetadataApplier.class); + + // Catalog is unSerializable. + private transient Catalog catalog; + + // currently, we set table options for all tables using the same options. + private final Map tableOptions; + + private final Options catalogOptions; + + private final Map> partitionMaps; + + private Set enabledSchemaEvolutionTypes; + + public PaimonMetadataApplier(Options catalogOptions) { + this.catalogOptions = catalogOptions; + this.tableOptions = new HashMap<>(); + this.partitionMaps = new HashMap<>(); + this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); + } + + public PaimonMetadataApplier( + Options catalogOptions, + Map tableOptions, + Map> partitionMaps) { + this.catalogOptions = catalogOptions; + this.tableOptions = tableOptions; + this.partitionMaps = partitionMaps; + this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); + } + + @Override + public MetadataApplier setAcceptedSchemaEvolutionTypes( + Set schemaEvolutionTypes) { + this.enabledSchemaEvolutionTypes = schemaEvolutionTypes; + return this; + } + + @Override + public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) { + return enabledSchemaEvolutionTypes.contains(schemaChangeEventType); + } + + @Override + public Set getSupportedSchemaEvolutionTypes() { + return Sets.newHashSet( + SchemaChangeEventType.CREATE_TABLE, + SchemaChangeEventType.ADD_COLUMN, + SchemaChangeEventType.DROP_COLUMN, + SchemaChangeEventType.RENAME_COLUMN, + SchemaChangeEventType.ALTER_COLUMN_TYPE); + } + + @Override + public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) + throws SchemaEvolveException { + if (catalog == null) { + FlinkCatalogFactory flinkCatalogFactory = + FactoryUtil.discoverFactory( + FlinkCatalogFactory.class.getClassLoader(), + FlinkCatalogFactory.class, + FlinkCatalogFactory.IDENTIFIER); + FlinkCatalog flinkCatalog = + flinkCatalogFactory.createCatalog( + new FactoryUtil.DefaultCatalogContext( + "flink-catalog", + catalogOptions.toMap(), + null, + FlinkCatalog.class.getClassLoader())); + catalog = flinkCatalog.catalog(); + } + SchemaChangeEventVisitor.visit( + schemaChangeEvent, + addColumnEvent -> { + applyAddColumn(addColumnEvent); + return null; + }, + alterColumnTypeEvent -> { + applyAlterColumnType(alterColumnTypeEvent); + return null; + }, + createTableEvent -> { + applyCreateTable(createTableEvent); + return null; + }, + dropColumnEvent -> { + applyDropColumn(dropColumnEvent); + return null; + }, + dropTableEvent -> { + applyDropTable(dropTableEvent); + return null; + }, + renameColumnEvent -> { + applyRenameColumn(renameColumnEvent); + return null; + }, + truncateTableEvent -> { + applyTruncateTable(truncateTableEvent); + return null; + }); + } + + @Override + public void close() throws Exception { + if (catalog != null) { + catalog.close(); + } + } + + private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveException { + try { + if (!catalog.listDatabases().contains(event.tableId().getSchemaName())) { + catalog.createDatabase(event.tableId().getSchemaName(), true); + } + Schema schema = event.getSchema(); + org.apache.paimon.schema.Schema.Builder builder = + new org.apache.paimon.schema.Schema.Builder(); + schema.getColumns() + .forEach( + (column) -> + builder.column( + column.getName(), + LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(column.getType()) + .getLogicalType()), + column.getComment())); + List partitionKeys = new ArrayList<>(); + List primaryKeys = schema.primaryKeys(); + if (partitionMaps.containsKey(event.tableId())) { + partitionKeys.addAll(partitionMaps.get(event.tableId())); + } else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) { + partitionKeys.addAll(schema.partitionKeys()); + } + for (String partitionColumn : partitionKeys) { + if (!primaryKeys.contains(partitionColumn)) { + primaryKeys.add(partitionColumn); + } + } + builder.primaryKey(primaryKeys) + .partitionKeys(partitionKeys) + .comment(schema.comment()) + .options(tableOptions) + .options(schema.options()); + catalog.createTable(tableIdToIdentifier(event), builder.build(), true); + } catch (Catalog.TableAlreadyExistException + | Catalog.DatabaseNotExistException + | Catalog.DatabaseAlreadyExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private void applyAddColumn(AddColumnEvent event) throws SchemaEvolveException { + try { + List tableChangeList = applyAddColumnEventWithPosition(event); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + if (e instanceof Catalog.ColumnAlreadyExistException) { + LOG.warn("{}, skip it.", e.getMessage()); + } else { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + } + + private List applyAddColumnEventWithPosition(AddColumnEvent event) + throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) { + switch (columnWithPosition.getPosition()) { + case FIRST: + tableChangeList.addAll( + SchemaChangeProvider.add( + columnWithPosition, + SchemaChange.Move.first( + columnWithPosition.getAddColumn().getName()))); + break; + case LAST: + tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition)); + break; + case BEFORE: + tableChangeList.addAll( + applyAddColumnWithBeforePosition( + event.tableId().getSchemaName(), + event.tableId().getTableName(), + columnWithPosition)); + break; + case AFTER: + checkNotNull( + columnWithPosition.getExistedColumnName(), + "Existing column name must be provided for AFTER position"); + SchemaChange.Move after = + SchemaChange.Move.after( + columnWithPosition.getAddColumn().getName(), + columnWithPosition.getExistedColumnName()); + tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition, after)); + break; + default: + throw new SchemaEvolveException( + event, + "Unknown column position: " + columnWithPosition.getPosition()); + } + } + return tableChangeList; + } catch (Catalog.TableNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private List applyAddColumnWithBeforePosition( + String schemaName, + String tableName, + AddColumnEvent.ColumnWithPosition columnWithPosition) + throws Catalog.TableNotExistException { + String existedColumnName = columnWithPosition.getExistedColumnName(); + Table table = catalog.getTable(new Identifier(schemaName, tableName)); + List columnNames = table.rowType().getFieldNames(); + int index = checkColumnPosition(existedColumnName, columnNames); + String columnName = columnWithPosition.getAddColumn().getName(); + return SchemaChangeProvider.add( + columnWithPosition, + (index == 0) + ? SchemaChange.Move.first(columnName) + : SchemaChange.Move.after(columnName, columnNames.get(index - 1))); + } + + private int checkColumnPosition(String existedColumnName, List columnNames) { + if (existedColumnName == null) { + return 0; + } + int index = columnNames.indexOf(existedColumnName); + checkArgument(index != -1, "Column %s not found", existedColumnName); + return index; + } + + private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + event.getDroppedColumnNames() + .forEach((column) -> tableChangeList.addAll(SchemaChangeProvider.drop(column))); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); + } catch (Catalog.TableNotExistException | Catalog.ColumnNotExistException e) { + LOG.warn("Failed to apply DropColumnEvent, skip it.", e); + } catch (Catalog.ColumnAlreadyExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + event.getNameMapping() + .forEach( + (oldName, newName) -> + tableChangeList.addAll( + SchemaChangeProvider.rename(oldName, newName))); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolveException { + try { + List tableChangeList = new ArrayList<>(); + event.getTypeMapping() + .forEach( + (oldName, newType) -> + tableChangeList.add( + SchemaChangeProvider.updateColumnType( + oldName, newType))); + catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true); + } catch (Catalog.TableNotExistException + | Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException e) { + throw new SchemaEvolveException(event, e.getMessage(), e); + } + } + + private void applyTruncateTable(TruncateTableEvent event) throws SchemaEvolveException { + try { + Table table = catalog.getTable(tableIdToIdentifier(event)); + if (table.options().get("deletion-vectors.enabled").equals("true")) { + throw new UnsupportedSchemaChangeEventException( + event, "Unable to truncate a table with deletion vectors enabled.", null); + } + try (BatchTableCommit batchTableCommit = table.newBatchWriteBuilder().newCommit()) { + batchTableCommit.truncateTable(); + } + } catch (Exception e) { + throw new SchemaEvolveException(event, "Failed to apply truncate table event", e); + } + } + + private void applyDropTable(DropTableEvent event) throws SchemaEvolveException { + try { + catalog.dropTable(tableIdToIdentifier(event), true); + } catch (Catalog.TableNotExistException e) { + throw new SchemaEvolveException(event, "Failed to apply drop table event", e); + } + } + + private static Identifier tableIdToIdentifier(SchemaChangeEvent event) { + return new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/SchemaChangeProvider.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/SchemaChangeProvider.java new file mode 100644 index 000000000000..bd0137a57aee --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/schema/SchemaChangeProvider.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.schema; + +import org.apache.paimon.flink.pipeline.cdc.util.FlinkCDCToPaimonTypeConverter; +import org.apache.paimon.schema.SchemaChange; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.types.DataType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * The SchemaChangeProvider class provides static methods to create SchemaChange objects that + * represent different types of schema modifications. + */ +public class SchemaChangeProvider { + + /** + * Creates a SchemaChange object for adding a column without specifying its position. + * + * @param columnWithPosition The ColumnWithPosition object containing the column details and its + * intended position within the schema. + * @return A SchemaChange object representing the addition of a column. + */ + public static List add(AddColumnEvent.ColumnWithPosition columnWithPosition) { + List result = new ArrayList<>(); + result.add( + SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + FlinkCDCToPaimonTypeConverter.convertFlinkCDCDataTypeToPaimonDataType( + columnWithPosition.getAddColumn().getType()), + columnWithPosition.getAddColumn().getComment())); + // if default value express exists, we need to set the default value to the table + // option + Column column = columnWithPosition.getAddColumn(); + Optional.ofNullable( + FlinkCDCToPaimonTypeConverter.convertFlinkCDCDefaultValueToValidValue( + column.getDefaultValueExpression(), column.getType())) + .ifPresent( + value -> { + result.add( + SchemaChange.updateColumnDefaultValue( + new String[] {column.getName()}, value)); + }); + return result; + } + + /** + * Creates a SchemaChange object for adding a column with a specified position. + * + * @param columnWithPosition The ColumnWithPosition object containing the column details and its + * intended position within the schema. + * @param move The move operation to indicate the column's new position. + * @return A SchemaChange object representing the addition of a column with position + * information. + */ + public static List add( + AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) { + List result = new ArrayList<>(); + result.add( + SchemaChange.addColumn( + columnWithPosition.getAddColumn().getName(), + FlinkCDCToPaimonTypeConverter.convertFlinkCDCDataTypeToPaimonDataType( + columnWithPosition.getAddColumn().getType()), + columnWithPosition.getAddColumn().getComment(), + move)); + // if default value express exists, we need to set the default value to the table + // option + Column column = columnWithPosition.getAddColumn(); + Optional.ofNullable( + FlinkCDCToPaimonTypeConverter.convertFlinkCDCDefaultValueToValidValue( + column.getDefaultValueExpression(), column.getType())) + .ifPresent( + value -> { + result.add( + SchemaChange.updateColumnDefaultValue( + new String[] {column.getName()}, value)); + }); + return result; + } + + /** + * Creates a SchemaChange object to update the data type of a column. + * + * @param oldColumnName The name of the column whose data type is to be updated. + * @param newType The new DataType for the column. + * @return A SchemaChange object representing the update of the column's data type. + */ + public static SchemaChange updateColumnType(String oldColumnName, DataType newType) { + return SchemaChange.updateColumnType( + oldColumnName, + FlinkCDCToPaimonTypeConverter.convertFlinkCDCDataTypeToPaimonDataType(newType)); + } + + /** + * Creates a SchemaChange object for renaming a column. + * + * @param oldColumnName The current name of the column to be renamed. + * @param newColumnName The new name for the column. + * @return A SchemaChange object representing the renaming of a column. + */ + public static List rename(String oldColumnName, String newColumnName) { + List result = new ArrayList<>(); + result.add(SchemaChange.renameColumn(oldColumnName, newColumnName)); + return result; + } + + /** + * Creates a SchemaChange object for dropping a column. + * + * @param columnName The name of the column to be dropped. + * @return A SchemaChange object representing the deletion of a column. + */ + public static List drop(String columnName) { + List result = new ArrayList<>(); + result.add(SchemaChange.dropColumn(columnName)); + return result; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonDataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonDataConverter.java new file mode 100644 index 000000000000..8148541fd9ff --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonDataConverter.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.memory.MemorySegmentUtils; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.cdc.common.data.ArrayData; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.MapData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.binary.BinaryArrayData; +import org.apache.flink.cdc.common.data.binary.BinaryMapData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.OperationType; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.common.types.DataTypeRoot; +import org.apache.flink.core.memory.MemorySegment; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +/** Utils for converting Flink CDC data change event to Paimon internal row. */ +public class FlinkCDCToPaimonDataConverter { + + /** Convert Flink CDC data change event to Paimon internal row. */ + public static InternalRow convertDataChangeEventToInternalRow( + DataChangeEvent event, List fieldGetters) { + RecordData recordData = + OperationType.DELETE.equals(event.op()) ? event.before() : event.after(); + RowKind rowKind = OperationType.DELETE.equals(event.op()) ? RowKind.DELETE : RowKind.INSERT; + Preconditions.checkArgument( + recordData.getArity() == fieldGetters.size(), + "Field arity not equal to field getters size of %s", + event.tableId()); + + GenericRow genericRow = new GenericRow(rowKind, recordData.getArity()); + for (int i = 0; i < recordData.getArity(); i++) { + genericRow.setField(i, fieldGetters.get(i).getFieldOrNull(recordData)); + } + return genericRow; + } + + public static List createFieldGetters(List fieldTypes) { + List fieldGetters = new ArrayList<>(); + for (int i = 0; i < fieldTypes.size(); i++) { + fieldGetters.add(createFieldGetter(fieldTypes.get(i), i)); + } + return fieldGetters; + } + + public static RecordData.FieldGetter createFieldGetter(DataType fieldType, int fieldPos) { + final RecordData.FieldGetter fieldGetter; + // Ordered by type root definition + switch (fieldType.getTypeRoot()) { + case CHAR: + case VARCHAR: + fieldGetter = row -> BinaryString.fromString(row.getString(fieldPos).toString()); + break; + case BOOLEAN: + fieldGetter = row -> row.getBoolean(fieldPos); + break; + case BINARY: + case VARBINARY: + fieldGetter = row -> row.getBinary(fieldPos); + break; + case DECIMAL: + final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); + final int decimalScale = DataTypeChecks.getScale(fieldType); + fieldGetter = + row -> { + DecimalData decimalData = + row.getDecimal(fieldPos, decimalPrecision, decimalScale); + return Decimal.fromBigDecimal( + decimalData.toBigDecimal(), decimalPrecision, decimalScale); + }; + break; + case TINYINT: + fieldGetter = row -> row.getByte(fieldPos); + break; + case SMALLINT: + fieldGetter = row -> row.getShort(fieldPos); + break; + case BIGINT: + fieldGetter = row -> row.getLong(fieldPos); + break; + case FLOAT: + fieldGetter = row -> row.getFloat(fieldPos); + break; + case DOUBLE: + fieldGetter = row -> row.getDouble(fieldPos); + break; + case INTEGER: + fieldGetter = row -> row.getInt(fieldPos); + break; + case DATE: + fieldGetter = row -> (int) row.getDate(fieldPos).toEpochDay(); + break; + case TIME_WITHOUT_TIME_ZONE: + fieldGetter = row -> (int) row.getTime(fieldPos).toMillisOfDay(); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + fieldGetter = + row -> + Timestamp.fromSQLTimestamp( + row.getTimestamp( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toTimestamp()); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + fieldGetter = + row -> + Timestamp.fromInstant( + row.getLocalZonedTimestampData( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toInstant()); + break; + case ROW: + final int rowFieldCount = DataTypeChecks.getFieldCount(fieldType); + fieldGetter = new BinaryFieldDataGetter(fieldPos, DataTypeRoot.ROW, rowFieldCount); + break; + case ARRAY: + case MAP: + fieldGetter = new BinaryFieldDataGetter(fieldPos, fieldType.getTypeRoot()); + break; + default: + throw new IllegalArgumentException( + "Don't support type of " + fieldType.getTypeRoot()); + } + if (!fieldType.isNullable()) { + return fieldGetter; + } + return row -> { + if (row.isNullAt(fieldPos)) { + return null; + } + return fieldGetter.getFieldOrNull(row); + }; + } + + /** A helper class to create FieldGetter and GenericRow. */ + public static class BinaryFieldDataGetter implements RecordData.FieldGetter { + private final int fieldPos; + private final DataTypeRoot dataTypeRoot; + private final int rowFieldCount; + + BinaryFieldDataGetter(int fieldPos, DataTypeRoot dataTypeRoot) { + this(fieldPos, dataTypeRoot, -1); + } + + BinaryFieldDataGetter(int fieldPos, DataTypeRoot dataTypeRoot, int rowFieldCount) { + this.fieldPos = fieldPos; + this.dataTypeRoot = dataTypeRoot; + this.rowFieldCount = rowFieldCount; + } + + @Override + public Object getFieldOrNull(RecordData row) { + switch (dataTypeRoot) { + case ARRAY: + return getArrayField(row); + case MAP: + return getMapField(row); + case ROW: + return getRecordField(row); + default: + throw new IllegalArgumentException("Unsupported field type: " + dataTypeRoot); + } + } + + private Object getArrayField(RecordData row) { + ArrayData arrayData = row.getArray(fieldPos); + if (!(arrayData instanceof BinaryArrayData)) { + throw new IllegalArgumentException( + "Expected BinaryArrayData but was " + arrayData.getClass().getSimpleName()); + } + BinaryArrayData binaryArrayData = (BinaryArrayData) arrayData; + return convertSegments( + binaryArrayData.getSegments(), + binaryArrayData.getOffset(), + binaryArrayData.getSizeInBytes(), + MemorySegmentUtils::readArrayData); + } + + private Object getMapField(RecordData row) { + MapData mapData = row.getMap(fieldPos); + if (!(mapData instanceof BinaryMapData)) { + throw new IllegalArgumentException( + "Expected BinaryMapData but was " + mapData.getClass().getSimpleName()); + } + BinaryMapData binaryMapData = (BinaryMapData) mapData; + return convertSegments( + binaryMapData.getSegments(), + binaryMapData.getOffset(), + binaryMapData.getSizeInBytes(), + MemorySegmentUtils::readMapData); + } + + private Object getRecordField(RecordData row) { + RecordData recordData = row.getRow(fieldPos, rowFieldCount); + if (!(recordData instanceof BinaryRecordData)) { + throw new IllegalArgumentException( + "Expected BinaryRecordData but was " + + recordData.getClass().getSimpleName()); + } + BinaryRecordData binaryRecordData = (BinaryRecordData) recordData; + return convertSegments( + binaryRecordData.getSegments(), + binaryRecordData.getOffset(), + binaryRecordData.getSizeInBytes(), + (segments, offset, sizeInBytes) -> + MemorySegmentUtils.readRowData( + segments, rowFieldCount, offset, sizeInBytes)); + } + + private T convertSegments( + MemorySegment[] segments, + int offset, + int sizeInBytes, + SegmentConverter converter) { + org.apache.paimon.memory.MemorySegment[] paimonMemorySegments = + new org.apache.paimon.memory.MemorySegment[segments.length]; + for (int i = 0; i < segments.length; i++) { + MemorySegment currMemorySegment = segments[i]; + ByteBuffer byteBuffer = currMemorySegment.wrap(0, currMemorySegment.size()); + + // Allocate a new byte array and copy the data from the ByteBuffer + byte[] bytes = new byte[currMemorySegment.size()]; + byteBuffer.get(bytes); + + paimonMemorySegments[i] = org.apache.paimon.memory.MemorySegment.wrap(bytes); + } + return converter.convert(paimonMemorySegments, offset, sizeInBytes); + } + + private interface SegmentConverter { + T convert( + org.apache.paimon.memory.MemorySegment[] segments, int offset, int sizeInBytes); + } + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonTypeConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonTypeConverter.java new file mode 100644 index 000000000000..5ee775ef2035 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/FlinkCDCToPaimonTypeConverter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataType; + +import org.apache.flink.cdc.common.types.LocalZonedTimestampType; +import org.apache.flink.cdc.common.types.TimestampType; +import org.apache.flink.cdc.common.types.ZonedTimestampType; +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; + +/** Type converter from Flink CDC to Paimon. */ +public class FlinkCDCToPaimonTypeConverter { + + public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00"; + + public static final String INVALID_OR_MISSING_DATETIME = "0000-00-00 00:00:00"; + + /** Convert Flink CDC schema to Paimon schema. */ + public static Schema convertFlinkCDCSchemaToPaimonSchema( + org.apache.flink.cdc.common.schema.Schema schema) { + Schema.Builder builder = new Schema.Builder(); + schema.getColumns() + .forEach( + (column) -> + builder.column( + column.getName(), + convertFlinkCDCDataTypeToPaimonDataType(column.getType()), + column.getComment(), + convertFlinkCDCDefaultValueToValidValue( + column.getDefaultValueExpression(), + column.getType()))); + builder.primaryKey(schema.primaryKeys()) + .partitionKeys(schema.partitionKeys()) + .comment(schema.comment()) + .options(schema.options()); + return builder.build(); + } + + /** Convert Flink CDC data type to Paimon data type. */ + public static DataType convertFlinkCDCDataTypeToPaimonDataType( + org.apache.flink.cdc.common.types.DataType dataType) { + return LogicalTypeConversion.toDataType( + DataTypeUtils.toFlinkDataType(dataType).getLogicalType()); + } + + /** Convert Flink CDC default value to a valid value of Paimon. */ + public static String convertFlinkCDCDefaultValueToValidValue( + String defaultValue, org.apache.flink.cdc.common.types.DataType dataType) { + if (defaultValue == null) { + return null; + } + + if (dataType instanceof LocalZonedTimestampType + || dataType instanceof TimestampType + || dataType instanceof ZonedTimestampType) { + + if (INVALID_OR_MISSING_DATETIME.equals(defaultValue)) { + return DEFAULT_DATETIME; + } + } + + return defaultValue; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCDataConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCDataConverter.java new file mode 100644 index 000000000000..45e077c983d4 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCDataConverter.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.paimon.data.InternalRow; + +import org.apache.flink.cdc.common.data.DateData; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.TimeData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypeChecks; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** Converter from Paimon to Flink CDC data. */ +public class PaimonToFlinkCDCDataConverter { + + /** Convert Paimon row to Flink CDC data. */ + public static DataChangeEvent convertRowToDataChangeEvent( + TableId tableId, + InternalRow row, + List fieldGetters, + BinaryRecordDataGenerator recordDataGenerator) { + Object[] objects = new Object[row.getFieldCount()]; + for (int i = 0; i < row.getFieldCount(); i++) { + objects[i] = fieldGetters.get(i).getFieldOrNull(row); + } + BinaryRecordData binaryRecordData = recordDataGenerator.generate(objects); + switch (row.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + { + return DataChangeEvent.insertEvent(tableId, binaryRecordData, new HashMap<>()); + } + case DELETE: + case UPDATE_BEFORE: + { + return DataChangeEvent.deleteEvent(tableId, binaryRecordData, new HashMap<>()); + } + default: + throw new IllegalArgumentException("Unsupported RowKind type: " + row.getRowKind()); + } + } + + public static List createFieldGetters(List fieldTypes) { + List fieldGetters = new ArrayList<>(); + for (int i = 0; i < fieldTypes.size(); i++) { + fieldGetters.add(createFieldGetter(fieldTypes.get(i), i)); + } + return fieldGetters; + } + + public static InternalRow.FieldGetter createFieldGetter(DataType fieldType, int fieldPos) { + final InternalRow.FieldGetter fieldGetter; + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case CHAR: + case VARCHAR: + fieldGetter = + row -> BinaryStringData.fromString(row.getString(fieldPos).toString()); + break; + case BOOLEAN: + fieldGetter = row -> row.getBoolean(fieldPos); + break; + case BINARY: + case VARBINARY: + fieldGetter = row -> row.getBinary(fieldPos); + break; + case DECIMAL: + final int decimalPrecision = DataTypeChecks.getPrecision(fieldType); + final int decimalScale = DataTypeChecks.getScale(fieldType); + fieldGetter = + row -> + DecimalData.fromBigDecimal( + row.getDecimal(fieldPos, decimalPrecision, decimalScale) + .toBigDecimal(), + decimalPrecision, + decimalScale); + break; + case TINYINT: + fieldGetter = row -> row.getByte(fieldPos); + break; + case SMALLINT: + fieldGetter = row -> row.getShort(fieldPos); + break; + case BIGINT: + fieldGetter = row -> row.getLong(fieldPos); + break; + case FLOAT: + fieldGetter = row -> row.getFloat(fieldPos); + break; + case DOUBLE: + fieldGetter = row -> row.getDouble(fieldPos); + break; + case INTEGER: + fieldGetter = row -> row.getInt(fieldPos); + break; + case DATE: + fieldGetter = row -> DateData.fromEpochDay(row.getInt(fieldPos)); + break; + case TIME_WITHOUT_TIME_ZONE: + fieldGetter = row -> TimeData.fromMillisOfDay(row.getInt(fieldPos)); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + fieldGetter = + row -> + TimestampData.fromTimestamp( + row.getTimestamp( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toSQLTimestamp()); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + fieldGetter = + row -> + LocalZonedTimestampData.fromInstant( + row.getTimestamp( + fieldPos, + DataTypeChecks.getPrecision(fieldType)) + .toInstant()); + break; + default: + throw new IllegalArgumentException("Unsupported type: " + fieldType.getTypeRoot()); + } + if (!fieldType.isNullable()) { + return fieldGetter; + } + return row -> { + if (row.isNullAt(fieldPos)) { + return null; + } + return fieldGetter.getFieldOrNull(row); + }; + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCTypeConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCTypeConverter.java new file mode 100644 index 000000000000..2c369ff9a059 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/pipeline/cdc/util/PaimonToFlinkCDCTypeConverter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.types.DataType; + +import org.apache.flink.cdc.common.types.utils.DataTypeUtils; +import org.apache.flink.table.types.utils.TypeConversions; + +/** Type converter from Paimon to Flink CDC. */ +public class PaimonToFlinkCDCTypeConverter { + + /** Convert Paimon schema to Flink CDC schema. */ + public static org.apache.flink.cdc.common.schema.Schema convertPaimonSchemaToFlinkCDCSchema( + Schema schema) { + org.apache.flink.cdc.common.schema.Schema.Builder builder = + new org.apache.flink.cdc.common.schema.Schema.Builder(); + schema.fields() + .forEach( + (column) -> + builder.physicalColumn( + column.name(), + convertFlinkCDCDataTypeToPaimonDataType(column.type()), + column.description(), + column.defaultValue())); + builder.primaryKey(schema.primaryKeys()) + .partitionKey(schema.partitionKeys()) + .comment(schema.comment()) + .options(schema.options()); + return builder.build(); + } + + /** Convert Paimon data type to Flink CDC data type. */ + public static org.apache.flink.cdc.common.types.DataType + convertFlinkCDCDataTypeToPaimonDataType(DataType dataType) { + return DataTypeUtils.fromFlinkDataType( + TypeConversions.fromLogicalToDataType( + LogicalTypeConversion.toLogicalType(dataType))); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplierTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplierTest.java new file mode 100644 index 000000000000..9d6497c02fee --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/schema/PaimonMetadataApplierTest.java @@ -0,0 +1,611 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.schema; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.FileSystemCatalogFactory; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.FlinkCatalogFactory; +import org.apache.paimon.flink.pipeline.cdc.util.FlinkCDCToPaimonTypeConverter; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.DataType; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** Tests for {@link PaimonMetadataApplier}. */ +class PaimonMetadataApplierTest { + + @TempDir public static java.nio.file.Path temporaryFolder; + + private static Catalog catalog; + + private static Options catalogOptions; + + private static MetadataApplier metadataApplier; + + @BeforeAll + public static void initialize() { + catalogOptions = new Options(); + catalogOptions.setString( + CatalogOptions.METASTORE.key(), FileSystemCatalogFactory.IDENTIFIER); + catalogOptions.setString( + CatalogOptions.WAREHOUSE.key(), + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString()); + catalogOptions.set(CatalogOptions.CACHE_ENABLED.key(), "false"); + catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + metadataApplier = new PaimonMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + } + + @AfterAll + public static void close() throws Exception { + if (catalog != null) { + catalog.close(); + } + if (metadataApplier != null) { + metadataApplier.close(); + } + } + + @Test + void testApplySchemaChange() throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "table1"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + RowType rowType = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(rowType); + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + null, + "col3DefValue"))); + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + rowType = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()), + new DataField( + 2, "col3", DataTypes.STRING(), null, "col3DefValue"))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(rowType); + + Map nameMapping = new HashMap<>(); + nameMapping.put("col2", "newcol2"); + nameMapping.put("col3", "newcol3"); + RenameColumnEvent renameColumnEvent = new RenameColumnEvent(tableId, nameMapping); + metadataApplier.applySchemaChange(renameColumnEvent); + rowType = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "newcol2", DataTypes.INT()), + new DataField( + 2, "newcol3", DataTypes.STRING(), null, "col3DefValue"))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(rowType); + + Map typeMapping = new HashMap<>(); + typeMapping.put("newcol2", org.apache.flink.cdc.common.types.DataTypes.STRING()); + AlterColumnTypeEvent alterColumnTypeEvent = + new AlterColumnTypeEvent(TableId.parse(tableId.identifier()), typeMapping); + metadataApplier.applySchemaChange(alterColumnTypeEvent); + rowType = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "newcol2", DataTypes.STRING()), + new DataField( + 2, "newcol3", DataTypes.STRING(), null, "col3DefValue"))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(rowType); + + DropColumnEvent dropColumnEvent = + new DropColumnEvent(tableId, Collections.singletonList("newcol2")); + metadataApplier.applySchemaChange(dropColumnEvent); + // id of DataField should keep the same as before dropping column + rowType = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField( + 2, "newcol3", DataTypes.STRING(), null, "col3DefValue"))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(rowType); + + // Create table with partition column. + tableId = TableId.tableId(databaseName, "table_with_partition"); + createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .physicalColumn( + "dt", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .primaryKey("col1", "dt") + .partitionKey("dt") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + rowType = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(2, "dt", DataTypes.INT().notNull()))); + Table tableWithPartition = catalog.getTable(Identifier.fromString(tableId.identifier())); + Assertions.assertThat(tableWithPartition.rowType()).isEqualTo(rowType); + Assertions.assertThat(tableWithPartition.primaryKeys()) + .isEqualTo(Arrays.asList("col1", "dt")); + // Create table with upper case. + tableId = TableId.tableId(databaseName, "table_with_upper_case"); + catalogOptions.setString(CatalogOptions.CASE_SENSITIVE.key(), "true"); + PaimonMetadataApplier anotherMetadataApplier = new PaimonMetadataApplier(catalogOptions); + createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "COL1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("COL1") + .build()); + anotherMetadataApplier.applySchemaChange(createTableEvent); + rowType = + new RowType( + Arrays.asList( + new DataField(0, "COL1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(rowType); + } + + @Test + public void testCreateTableWithoutPrimaryKey() + throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "table1"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn( + "col4", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .build()); + metadataApplier.applySchemaChange(createTableEvent); + Table table = catalog.getTable(Identifier.fromString(tableId.identifier())); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.STRING()), + new DataField(2, "col3", DataTypes.STRING()), + new DataField(3, "col4", DataTypes.STRING()))); + Assertions.assertThat(table.rowType()).isEqualTo(tableSchema); + Assertions.assertThat(table.primaryKeys()).isEmpty(); + Assertions.assertThat(table.partitionKeys()).isEmpty(); + Assertions.assertThat(table.options()).containsEntry(CoreOptions.BUCKET.key(), "-1"); + } + + @Test + void testCreateTableWithOptions() throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "table1"); + Map> partitionMaps = new HashMap<>(); + partitionMaps.put(tableId, Arrays.asList("col3", "col4")); + Map tableOptions = new HashMap<>(); + tableOptions.put(CoreOptions.BUCKET.key(), "-1"); + MetadataApplier anotherMetadataApplier = + new PaimonMetadataApplier(catalogOptions, tableOptions, partitionMaps); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn( + "col4", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .primaryKey("col1", "col3", "col4") + .build()); + anotherMetadataApplier.applySchemaChange(createTableEvent); + Table table = catalog.getTable(Identifier.fromString(tableId.identifier())); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.STRING()), + new DataField(2, "col3", DataTypes.STRING().notNull()), + new DataField(3, "col4", DataTypes.STRING().notNull()))); + Assertions.assertThat(table.rowType()).isEqualTo(tableSchema); + Assertions.assertThat(table.primaryKeys()).isEqualTo(Arrays.asList("col1", "col3", "col4")); + Assertions.assertThat(table.partitionKeys()).isEqualTo(Arrays.asList("col3", "col4")); + Assertions.assertThat(table.options()).containsEntry(CoreOptions.BUCKET.key(), "-1"); + } + + @Test + void testCreateTableWithAllDataTypes() + throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "table1"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "boolean", + org.apache.flink.cdc.common.types.DataTypes.BOOLEAN()) + .physicalColumn( + "binary", + org.apache.flink.cdc.common.types.DataTypes.BINARY(3)) + .physicalColumn( + "varbinary", + org.apache.flink.cdc.common.types.DataTypes.VARBINARY(10)) + .physicalColumn( + "bytes", + org.apache.flink.cdc.common.types.DataTypes.BYTES()) + .physicalColumn( + "tinyint", + org.apache.flink.cdc.common.types.DataTypes.TINYINT()) + .physicalColumn( + "smallint", + org.apache.flink.cdc.common.types.DataTypes.SMALLINT()) + .physicalColumn( + "int", org.apache.flink.cdc.common.types.DataTypes.INT()) + .physicalColumn( + "float", + org.apache.flink.cdc.common.types.DataTypes.FLOAT()) + .physicalColumn( + "double", + org.apache.flink.cdc.common.types.DataTypes.DOUBLE()) + .physicalColumn( + "decimal", + org.apache.flink.cdc.common.types.DataTypes.DECIMAL(6, 3)) + .physicalColumn( + "char", org.apache.flink.cdc.common.types.DataTypes.CHAR(5)) + .physicalColumn( + "varchar", + org.apache.flink.cdc.common.types.DataTypes.VARCHAR(10)) + .physicalColumn( + "string", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .physicalColumn( + "date", org.apache.flink.cdc.common.types.DataTypes.DATE()) + .physicalColumn( + "time", org.apache.flink.cdc.common.types.DataTypes.TIME()) + .physicalColumn( + "time_with_precision", + org.apache.flink.cdc.common.types.DataTypes.TIME(6)) + .physicalColumn( + "timestamp", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP()) + .physicalColumn( + "timestamp_with_precision", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(3)) + .physicalColumn( + "timestamp_ltz", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ()) + .physicalColumn( + "timestamp_ltz_with_precision", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ( + 3)) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + RowType rowType = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "boolean", DataTypes.BOOLEAN()), + new DataField(2, "binary", DataTypes.BINARY(3)), + new DataField(3, "varbinary", DataTypes.VARBINARY(10)), + new DataField(4, "bytes", DataTypes.BYTES()), + new DataField(5, "tinyint", DataTypes.TINYINT()), + new DataField(6, "smallint", DataTypes.SMALLINT()), + new DataField(7, "int", DataTypes.INT()), + new DataField(8, "float", DataTypes.FLOAT()), + new DataField(9, "double", DataTypes.DOUBLE()), + new DataField(10, "decimal", DataTypes.DECIMAL(6, 3)), + new DataField(11, "char", DataTypes.CHAR(5)), + new DataField(12, "varchar", DataTypes.VARCHAR(10)), + new DataField(13, "string", DataTypes.STRING()), + new DataField(14, "date", DataTypes.DATE()), + new DataField(15, "time", DataTypes.TIME(0)), + new DataField(16, "time_with_precision", DataTypes.TIME(6)), + new DataField(17, "timestamp", DataTypes.TIMESTAMP(6)), + new DataField( + 18, "timestamp_with_precision", DataTypes.TIMESTAMP(3)), + new DataField( + 19, + "timestamp_ltz", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)), + new DataField( + 20, + "timestamp_ltz_with_precision", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)))); + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(rowType); + } + + @Test + void testAddColumnWithPosition() throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "table1"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .primaryKey("col1") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes + .STRING()))); // default last position. + AddColumnEvent addColumnEvent = new AddColumnEvent(tableId, addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(2, "col3", DataTypes.STRING()))); + + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + + addedColumns.clear(); + + addedColumns.add( + AddColumnEvent.before( + Column.physicalColumn( + "col4_first_before", + org.apache.flink.cdc.common.types.DataTypes.STRING()), + "col1")); + addedColumns.add( + AddColumnEvent.first( + Column.physicalColumn( + "col4_first", + org.apache.flink.cdc.common.types.DataTypes.STRING()))); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "col5_last", + org.apache.flink.cdc.common.types.DataTypes.STRING()))); + addedColumns.add( + AddColumnEvent.before( + Column.physicalColumn( + "col6_before", + org.apache.flink.cdc.common.types.DataTypes.STRING()), + "col2")); + addedColumns.add( + AddColumnEvent.after( + Column.physicalColumn( + "col7_after", org.apache.flink.cdc.common.types.DataTypes.STRING()), + "col2")); + + addColumnEvent = new AddColumnEvent(tableId, addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + + tableSchema = + new RowType( + Arrays.asList( + new DataField(4, "col4_first", DataTypes.STRING()), + new DataField(3, "col4_first_before", DataTypes.STRING()), + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(6, "col6_before", DataTypes.STRING()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(7, "col7_after", DataTypes.STRING()), + new DataField(2, "col3", DataTypes.STRING()), + new DataField(5, "col5_last", DataTypes.STRING()))); + + Assertions.assertThat( + catalog.getTable(Identifier.fromString(tableId.identifier())).rowType()) + .isEqualTo(tableSchema); + } + + @Test + public void testCreateTableWithComment() + throws Catalog.TableNotExistException, SchemaEvolveException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "test.table_with_comment"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull(), + "comment of col1") + .physicalColumn( + "col2", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + "comment of col2") + .physicalColumn( + "col3", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + "comment of col3") + .physicalColumn( + "col4", + org.apache.flink.cdc.common.types.DataTypes.STRING(), + "comment of col4") + .comment("comment of table_with_comment") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + Table table = catalog.getTable(Identifier.fromString(tableId.identifier())); + RowType tableSchema = + new RowType( + Arrays.asList( + new DataField( + 0, "col1", DataTypes.STRING().notNull(), "comment of col1"), + new DataField(1, "col2", DataTypes.STRING(), "comment of col2"), + new DataField(2, "col3", DataTypes.STRING(), "comment of col3"), + new DataField(3, "col4", DataTypes.STRING(), "comment of col4"))); + Assertions.assertThat(table.rowType()).isEqualTo(tableSchema); + Assertions.assertThat(table.primaryKeys()).isEmpty(); + Assertions.assertThat(table.partitionKeys()).isEmpty(); + Assertions.assertThat(table.options()).containsEntry("bucket", "-1"); + Assertions.assertThat(table.comment()).contains("comment of table_with_comment"); + } + + @Test + public void testInvalidTimestampDefaultValueConversionInAddColumn() + throws SchemaEvolveException, Catalog.TableNotExistException { + String databaseName = "test_" + UUID.randomUUID(); + TableId tableId = TableId.tableId(databaseName, "timestamp_test"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "id", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .physicalColumn( + "name", + org.apache.flink.cdc.common.types.DataTypes.STRING()) + .primaryKey("id") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + + List addedColumns = new ArrayList<>(); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "created_time", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP(), + null, + FlinkCDCToPaimonTypeConverter.INVALID_OR_MISSING_DATETIME))); + addedColumns.add( + AddColumnEvent.last( + Column.physicalColumn( + "updated_time", + org.apache.flink.cdc.common.types.DataTypes.TIMESTAMP_LTZ(), + null, + FlinkCDCToPaimonTypeConverter.INVALID_OR_MISSING_DATETIME))); + + AddColumnEvent addColumnEvent = + new AddColumnEvent(TableId.parse(tableId.identifier()), addedColumns); + metadataApplier.applySchemaChange(addColumnEvent); + + Table table = catalog.getTable(Identifier.fromString(tableId.identifier())); + + Assertions.assertThat(table).isNotNull(); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/DataConvertTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/DataConvertTest.java new file mode 100644 index 000000000000..aeb6c3b7d7a9 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/DataConvertTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.InternalRowUtils; + +import org.apache.flink.cdc.common.data.DateData; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.TimeData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Data convert test for {@link PaimonToFlinkCDCDataConverter} and {@link + * FlinkCDCToPaimonDataConverter}. + */ +public class DataConvertTest { + + @Test + void testFullTypesConversion() { + Schema fullTypesSchema = + Schema.newBuilder() + .physicalColumn("pk_string", DataTypes.STRING().notNull()) + .physicalColumn("boolean", DataTypes.BOOLEAN()) + .physicalColumn("binary", DataTypes.BINARY(3)) + .physicalColumn("varbinary", DataTypes.VARBINARY(10)) + .physicalColumn("bytes", DataTypes.BYTES()) + .physicalColumn("tinyint", DataTypes.TINYINT()) + .physicalColumn("smallint", DataTypes.SMALLINT()) + .physicalColumn("int", DataTypes.INT()) + .physicalColumn("bigint", DataTypes.BIGINT()) + .physicalColumn("float", DataTypes.FLOAT()) + .physicalColumn("double", DataTypes.DOUBLE()) + .physicalColumn("decimal", DataTypes.DECIMAL(6, 3)) + .physicalColumn("char", DataTypes.CHAR(5)) + .physicalColumn("varchar", DataTypes.VARCHAR(10)) + .physicalColumn("string", DataTypes.STRING()) + .physicalColumn("date", DataTypes.DATE()) + .physicalColumn("time", DataTypes.TIME()) + .physicalColumn("time_with_precision", DataTypes.TIME(6)) + .physicalColumn("timestamp", DataTypes.TIMESTAMP()) + .physicalColumn("timestamp_with_precision_3", DataTypes.TIMESTAMP(3)) + .physicalColumn("timestamp_with_precision_6", DataTypes.TIMESTAMP(6)) + .physicalColumn("timestamp_with_precision_9", DataTypes.TIMESTAMP(9)) + .physicalColumn("timestamp_ltz", DataTypes.TIMESTAMP_LTZ()) + .physicalColumn( + "timestamp_ltz_with_precision_3", DataTypes.TIMESTAMP_LTZ(3)) + .physicalColumn( + "timestamp_ltz_with_precision_6", DataTypes.TIMESTAMP_LTZ(6)) + .physicalColumn( + "timestamp_ltz_with_precision_9", DataTypes.TIMESTAMP_LTZ(9)) + .primaryKey("pk_string") + .partitionKey("boolean") + .build(); + + Object[] testData = + new Object[] { + BinaryStringData.fromString("pk_string"), + true, + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6}, + new byte[] {7, 8, 9}, + (byte) 1, + (short) 2, + 3, + 4L, + 5.1f, + 6.2, + DecimalData.fromBigDecimal(new BigDecimal("7.123"), 6, 3), + BinaryStringData.fromString("test1"), + BinaryStringData.fromString("test2"), + BinaryStringData.fromString("test3"), + DateData.fromEpochDay(1000), + TimeData.fromMillisOfDay(200), + TimeData.fromMillisOfDay(300), + TimestampData.fromMillis(100, 1), + TimestampData.fromMillis(200, 2), + TimestampData.fromMillis(300, 3), + TimestampData.fromMillis(400, 4), + LocalZonedTimestampData.fromEpochMillis(300, 3), + LocalZonedTimestampData.fromEpochMillis(400, 4), + LocalZonedTimestampData.fromEpochMillis(500, 5), + LocalZonedTimestampData.fromEpochMillis(600, 6), + }; + + Object[] expectedPaimonData = { + BinaryString.fromString("pk_string"), + true, + new byte[] {1, 2, 3}, + new byte[] {4, 5, 6}, + new byte[] {7, 8, 9}, + (byte) 1, + (short) 2, + 3, + 4L, + 5.1f, + 6.2, + Decimal.fromBigDecimal(new BigDecimal("7.123"), 6, 3), + BinaryString.fromString("test1"), + BinaryString.fromString("test2"), + BinaryString.fromString("test3"), + 1000, + 200, + 300, + Timestamp.fromEpochMillis(100, 1), + Timestamp.fromEpochMillis(200, 2), + Timestamp.fromEpochMillis(300, 3), + Timestamp.fromEpochMillis(400, 4), + Timestamp.fromEpochMillis(300, 3), + Timestamp.fromEpochMillis(400, 4), + Timestamp.fromEpochMillis(500, 5), + Timestamp.fromEpochMillis(600, 6), + }; + + testConvertBackAndForth(fullTypesSchema, testData, expectedPaimonData); + + Object[] allNullTestData = + new Object[] { + BinaryStringData.fromString("pk_string"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + }; + + Object[] expectedAllNullPaimonData = + new Object[] { + BinaryString.fromString("pk_string"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + }; + testConvertBackAndForth(fullTypesSchema, allNullTestData, expectedAllNullPaimonData); + } + + private void testConvertBackAndForth( + Schema cdcSchema, Object[] fields, Object[] expectedInternalRows) { + int arity = fields.length; + TableId tableId = TableId.tableId("testDatabase", "testTable"); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator( + cdcSchema.getColumnDataTypes().toArray(new DataType[0])); + DataChangeEvent event = + DataChangeEvent.insertEvent(tableId, recordDataGenerator.generate(fields)); + org.apache.paimon.schema.Schema paimonSchema = + FlinkCDCToPaimonTypeConverter.convertFlinkCDCSchemaToPaimonSchema(cdcSchema); + + // Step 1: Convert CDC Event to Paimon Event + List cdcFieldGetters = + FlinkCDCToPaimonDataConverter.createFieldGetters(cdcSchema.getColumnDataTypes()); + InternalRow paimonRow = + FlinkCDCToPaimonDataConverter.convertDataChangeEventToInternalRow( + event, cdcFieldGetters); + + // Step 2: Check Paimon Row specs + Assertions.assertThat(paimonRow.getRowKind()).isEqualTo(RowKind.INSERT); + Assertions.assertThat(paimonRow.getFieldCount()).isEqualTo(arity); + InternalRow.FieldGetter[] internalRowFieldGetters = + InternalRowUtils.createFieldGetters(paimonSchema.rowType().getFieldTypes()); + + List internalRowRepresentation = new ArrayList<>(); + for (InternalRow.FieldGetter internalRowFieldGetter : internalRowFieldGetters) { + internalRowRepresentation.add(internalRowFieldGetter.getFieldOrNull(paimonRow)); + } + Assertions.assertThat(internalRowRepresentation).containsExactly(expectedInternalRows); + + // Step 3: Convert it back + List paimonFieldGetters = + PaimonToFlinkCDCDataConverter.createFieldGetters(cdcSchema.getColumnDataTypes()); + DataChangeEvent convertedEvent = + PaimonToFlinkCDCDataConverter.convertRowToDataChangeEvent( + tableId, paimonRow, paimonFieldGetters, recordDataGenerator); + + // Step 4: Validate it + String[] originalFields = Arrays.stream(fields).map(this::stringify).toArray(String[]::new); + Assertions.assertThat(convertedEvent).isEqualTo(event); + Assertions.assertThat( + SchemaUtils.restoreOriginalData( + event.after(), SchemaUtils.createFieldGetters(cdcSchema))) + .map(this::stringify) + .containsExactly(originalFields); + } + + // Stringify byte[] properly for checking. + private String stringify(Object value) { + if (value instanceof byte[]) { + return Arrays.toString((byte[]) value); + } + return Objects.toString(value); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/TypeConverterTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/TypeConverterTest.java new file mode 100644 index 000000000000..93463235cd2e --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/pipeline/cdc/util/TypeConverterTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.pipeline.cdc.util; + +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataTypes; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.apache.paimon.types.DataTypes.BIGINT; +import static org.apache.paimon.types.DataTypes.BINARY; +import static org.apache.paimon.types.DataTypes.BOOLEAN; +import static org.apache.paimon.types.DataTypes.BYTES; +import static org.apache.paimon.types.DataTypes.CHAR; +import static org.apache.paimon.types.DataTypes.DATE; +import static org.apache.paimon.types.DataTypes.DECIMAL; +import static org.apache.paimon.types.DataTypes.DOUBLE; +import static org.apache.paimon.types.DataTypes.FLOAT; +import static org.apache.paimon.types.DataTypes.INT; +import static org.apache.paimon.types.DataTypes.SMALLINT; +import static org.apache.paimon.types.DataTypes.STRING; +import static org.apache.paimon.types.DataTypes.TIME; +import static org.apache.paimon.types.DataTypes.TIMESTAMP; +import static org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE; +import static org.apache.paimon.types.DataTypes.TINYINT; +import static org.apache.paimon.types.DataTypes.VARBINARY; +import static org.apache.paimon.types.DataTypes.VARCHAR; + +/** + * Type converter test for {@link FlinkCDCToPaimonTypeConverter} and {@link + * PaimonToFlinkCDCTypeConverter}. + */ +public class TypeConverterTest { + + @Test + public void testFullTypesConverter() { + Schema cdcSchema = + Schema.newBuilder() + .physicalColumn("pk_string", DataTypes.STRING().notNull()) + .physicalColumn("boolean", DataTypes.BOOLEAN()) + .physicalColumn("binary", DataTypes.BINARY(3)) + .physicalColumn("varbinary", DataTypes.VARBINARY(10)) + .physicalColumn("bytes", DataTypes.BYTES()) + .physicalColumn("tinyint", DataTypes.TINYINT()) + .physicalColumn("smallint", DataTypes.SMALLINT()) + .physicalColumn("int", DataTypes.INT()) + .physicalColumn("bigint", DataTypes.BIGINT()) + .physicalColumn("float", DataTypes.FLOAT()) + .physicalColumn("double", DataTypes.DOUBLE()) + .physicalColumn("decimal", DataTypes.DECIMAL(6, 3)) + .physicalColumn("char", DataTypes.CHAR(5)) + .physicalColumn("varchar", DataTypes.VARCHAR(10)) + .physicalColumn("string", DataTypes.STRING()) + .physicalColumn("date", DataTypes.DATE()) + .physicalColumn("time", DataTypes.TIME()) + .physicalColumn("time_with_precision", DataTypes.TIME(6)) + .physicalColumn("timestamp", DataTypes.TIMESTAMP()) + .physicalColumn("timestamp_with_precision_3", DataTypes.TIMESTAMP(3)) + .physicalColumn("timestamp_with_precision_6", DataTypes.TIMESTAMP(6)) + .physicalColumn("timestamp_with_precision_9", DataTypes.TIMESTAMP(9)) + .physicalColumn("timestamp_ltz", DataTypes.TIMESTAMP_LTZ()) + .physicalColumn( + "timestamp_ltz_with_precision_3", DataTypes.TIMESTAMP_LTZ(3)) + .physicalColumn( + "timestamp_ltz_with_precision_6", DataTypes.TIMESTAMP_LTZ(6)) + .physicalColumn( + "timestamp_ltz_with_precision_9", DataTypes.TIMESTAMP_LTZ(9)) + .primaryKey("pk_string") + .partitionKey("boolean") + .build(); + + // Step 1: Convert CDC Schema to Paimon Schema + org.apache.paimon.schema.Schema paimonSchema = + FlinkCDCToPaimonTypeConverter.convertFlinkCDCSchemaToPaimonSchema(cdcSchema); + + // Step 2: Validate Paimon Schema + Assertions.assertThat(paimonSchema) + .isEqualTo( + org.apache.paimon.schema.Schema.newBuilder() + .primaryKey("pk_string") + .partitionKeys("boolean") + .column("pk_string", STRING().notNull()) + .column("boolean", BOOLEAN()) + .column("binary", BINARY(3)) + .column("varbinary", VARBINARY(10)) + .column("bytes", BYTES()) + .column("tinyint", TINYINT()) + .column("smallint", SMALLINT()) + .column("int", INT()) + .column("bigint", BIGINT()) + .column("float", FLOAT()) + .column("double", DOUBLE()) + .column("decimal", DECIMAL(6, 3)) + .column("char", CHAR(5)) + .column("varchar", VARCHAR(10)) + .column("string", STRING()) + .column("date", DATE()) + .column("time", TIME(0)) + .column("time_with_precision", TIME(6)) + .column("timestamp", TIMESTAMP(6)) + .column("timestamp_with_precision_3", TIMESTAMP(3)) + .column("timestamp_with_precision_6", TIMESTAMP(6)) + .column("timestamp_with_precision_9", TIMESTAMP(9)) + .column("timestamp_ltz", TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) + .column( + "timestamp_ltz_with_precision_3", + TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) + .column( + "timestamp_ltz_with_precision_6", + TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) + .column( + "timestamp_ltz_with_precision_9", + TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)) + .build()); + + // Step 3: Convert it back + Schema recreatedCdcSchema = + PaimonToFlinkCDCTypeConverter.convertPaimonSchemaToFlinkCDCSchema(paimonSchema); + Assertions.assertThat(recreatedCdcSchema).isEqualTo(cdcSchema); + } +}