Skip to content

Conversation

@LsomeYeah
Copy link
Contributor

Purpose

Linked issue: close #xxx

In pr #4639 and #4878, we had supported migrating iceberg table managed by hadoop-catalog or hive-catalog to paimon. This pr aims to support migrating the iceberg table which had suffered once or several times schema evolution.

Paimon stores the schema-id in each DataFileMeta for reading data files which had suffered schema evolution, so we extract the schema-id used by each iceberg data file and record it in the corresponding paimon DataFileMeta, and this makes paimon can handle the schema evolution case.

Tests

IcebergMigrateTest#testDeleteColumn
IcebergMigrateTest#testRenameColumn
IcebergMigrateTest#testAddColumn
IcebergMigrateTest#testReorderColumn
IcebergMigrateTest#testUpdateColumn
IcebergMigrateTest#testMigrateWithRandomIcebergEvolution

API and Format

Documentation

…g to paimon

# Conflicts:
#	paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
…mon table to source table if migrating success

[core] alter access permisssion of getDataTypeFromType()
# Conflicts:
#	paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory

# Conflicts:
#	paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
private final InternalMap lowerBounds;
private final InternalMap upperBounds;

// only used for iceberg migrate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better : "only used for migrate iceberg table to paimon with schema evolution".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before migration, we do not check whether the Iceberg table has undergone schema evolution. schemaId will be used in all cases for iceberg migration.

// get iceberg current schema
IcebergSchema icebergSchema =
icebergMetadata.schemas().get(icebergMetadata.currentSchemaId());
public List<TableSchema> icebergSchemasToPaimonSchemas(IcebergMetadata icebergMetadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

}
}

public long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO fileIO) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

String targetDatabase,
String targetTableName,
Integer parallelism,
Map<String, String> options,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need of arg options

ParameterUtils.parseCommaSeparatedKeyValues(properties))
.executeMigrate();
ParameterUtils.parseCommaSeparatedKeyValues(properties));
LOG.info("create migrator success.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better : "create migrator success and begin executeMigrate."

Map<String, String> catalogConfig,
String icebergProperties,
String tableProperties,
Integer parallelism) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add @nullable before parallelism

@@ -0,0 +1,90 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also modify the doc of these action and procedure in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the content about hive-catalog and procedure is contained in #4878. This pr should not contained commits that had been merged. I'll close this pr and create a new pr. And the doc of iceberg migration will be submitted later in another separate pr.

public class IcebergMigrateHiveMetadataFactory implements IcebergMigrateMetadataFactory {
@Override
public String identifier() {
return IcebergOptions.StorageType.HIVE_CATALOG.toString() + "_migrate";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need call toString()


/** Factory to create {@link IcebergMigrateHiveMetadata}. */
public class IcebergMigrateHiveMetadataFactory implements IcebergMigrateMetadataFactory {
@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a blank line

public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata {
private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrateHiveMetadata.class);

public static final String TABLE_TYPE_PROP = "table_type";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

@LsomeYeah LsomeYeah closed this Feb 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants