Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public static Content fromId(int id) {
private final InternalMap lowerBounds;
private final InternalMap upperBounds;

// only used for iceberg migrate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better : "only used for migrate iceberg table to paimon with schema evolution".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before migration, we do not check whether the Iceberg table has undergone schema evolution. schemaId will be used in all cases for iceberg migration.

private long schemaId = 0;

IcebergDataFileMeta(
Content content,
String filePath,
Expand Down Expand Up @@ -201,6 +204,15 @@ public InternalMap upperBounds() {
return upperBounds;
}

public long schemaId() {
return schemaId;
}

public IcebergDataFileMeta withSchemaId(long schemaId) {
this.schemaId = schemaId;
return this;
}

public static RowType schema(RowType partitionType) {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(134, "content", DataTypes.INT().notNull()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,26 @@
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;

import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -136,15 +143,28 @@ public IcebergMigrator(

@Override
public void executeMigrate() throws Exception {
Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata);
List<TableSchema> paimonSchemas = icebergSchemasToPaimonSchemas(icebergMetadata);
Preconditions.checkArgument(
!paimonSchemas.isEmpty(),
"paimon schemas transformed from iceberg table is empty.");
Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, paimonTableName);

paimonCatalog.createDatabase(paimonDatabaseName, true);
paimonCatalog.createTable(paimonIdentifier, paimonSchema, false);
TableSchema firstSchema = paimonSchemas.get(0);
Preconditions.checkArgument(firstSchema.id() == 0, "Unexpected, first schema id is not 0.");
paimonCatalog.createTable(paimonIdentifier, firstSchema.toSchema(), false);

try {
FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
FileIO fileIO = paimonTable.fileIO();
SchemaManager schemaManager = paimonTable.schemaManager();
// commit all the iceberg schemas
for (int i = 1; i < paimonSchemas.size(); i++) {
LOG.info(
"commit new schema from iceberg, new schema id:{}",
paimonSchemas.get(i).id());
schemaManager.commit(paimonSchemas.get(i));
}

IcebergManifestFile manifestFile =
IcebergManifestFile.create(paimonTable, icebergMetaPathFactory);
Expand All @@ -157,25 +177,36 @@ public void executeMigrate() throws Exception {
// check manifest file with 'DELETE' kind
checkAndFilterManifestFiles(icebergManifestFileMetas);

// get all live iceberg entries
List<IcebergManifestEntry> icebergEntries =
icebergManifestFileMetas.stream()
.flatMap(fileMeta -> manifestFile.read(fileMeta).stream())
.filter(IcebergManifestEntry::isLive)
.collect(Collectors.toList());
if (icebergEntries.isEmpty()) {
Map<Long, List<IcebergManifestEntry>> icebergEntries = new HashMap<>();
for (IcebergManifestFileMeta icebergManifestFileMeta : icebergManifestFileMetas) {
long schemaId =
getSchemaIdFromIcebergManifestFile(
new Path(icebergManifestFileMeta.manifestPath()), fileIO);
List<IcebergManifestEntry> entries = manifestFile.read(icebergManifestFileMeta);
icebergEntries
.computeIfAbsent(schemaId, v -> new ArrayList<>())
.addAll(
entries.stream()
.filter(IcebergManifestEntry::isLive)
.collect(Collectors.toList()));
}

List<IcebergDataFileMeta> icebergDataFileMetas = new ArrayList<>();
// write schema id to IcebergDataFileMeta
for (Map.Entry<Long, List<IcebergManifestEntry>> kv : icebergEntries.entrySet()) {
icebergDataFileMetas.addAll(
kv.getValue().stream()
.map(entry -> entry.file().withSchemaId(kv.getKey()))
.collect(Collectors.toList()));
}

if (icebergDataFileMetas.isEmpty()) {
LOG.info(
"No live manifest entry in iceberg table for snapshot {}, iceberg table meta path is {}.",
"No live iceberg data files in iceberg table for snapshot {}, iceberg table meta path is {}.",
icebergMetadata.currentSnapshotId(),
icebergLatestMetadataLocation);
return;
}

List<IcebergDataFileMeta> icebergDataFileMetas =
icebergEntries.stream()
.map(IcebergManifestEntry::file)
.collect(Collectors.toList());

// Again, check if delete File exists
checkAndFilterDataFiles(icebergDataFileMetas);

Expand Down Expand Up @@ -246,10 +277,21 @@ public void renameTable(boolean ignoreIfNotExists) throws Exception {
paimonCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists);
}

public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata) {
// get iceberg current schema
IcebergSchema icebergSchema =
icebergMetadata.schemas().get(icebergMetadata.currentSchemaId());
public List<TableSchema> icebergSchemasToPaimonSchemas(IcebergMetadata icebergMetadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

return icebergMetadata.schemas().stream()
.map(
icebergSchema -> {
LOG.info(
"Convert iceberg schema to paimon schema, iceberg schema id: {}",
icebergSchema.schemaId());
return TableSchema.create(
icebergSchema.schemaId(),
icebergSchemaToPaimonSchema(icebergSchema));
})
.collect(Collectors.toList());
}

public Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) {

// get iceberg current partition spec
int currentPartitionSpecId = icebergMetadata.defaultSpecId();
Expand Down Expand Up @@ -289,6 +331,18 @@ private void checkAndFilterDataFiles(List<IcebergDataFileMeta> icebergDataFileMe
}
}

public long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO fileIO) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private


try (DataFileStream<GenericRecord> dataFileStream =
new DataFileStream<>(
fileIO.newInputStream(manifestPath), new GenericDatumReader<>())) {
String schema = dataFileStream.getMetaString("schema");
return JsonSerdeUtil.fromJson(schema, IcebergSchema.class).schemaId();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static List<DataFileMeta> construct(
List<IcebergDataFileMeta> icebergDataFileMetas,
FileIO fileIO,
Expand Down Expand Up @@ -318,7 +372,9 @@ private static DataFileMeta constructFileMeta(
e);
}
String format = icebergDataFileMeta.fileFormat();
return FileMetaUtils.constructFileMeta(format, status, fileIO, table, dir, rollback);
long schemaId = icebergDataFileMeta.schemaId();
return FileMetaUtils.constructFileMeta(
format, status, fileIO, table, dir, rollback, schemaId);
}

private MigrateTask importUnPartitionedTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,47 @@ public static DataFileMeta constructFileMeta(
}
}

public static DataFileMeta constructFileMeta(
String format,
FileStatus fileStatus,
FileIO fileIO,
Table table,
Path dir,
Map<Path, Path> rollback,
long schemaId) {

try {
RowType rowTypeWithSchemaId =
((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType();
SimpleColStatsCollector.Factory[] factories =
StatsCollectorFactories.createStatsFactories(
((FileStoreTable) table).coreOptions(),
rowTypeWithSchemaId.getFieldNames());

SimpleStatsExtractor simpleStatsExtractor =
FileFormat.fromIdentifier(
format,
((FileStoreTable) table).coreOptions().toConfiguration())
.createStatsExtractor(rowTypeWithSchemaId, factories)
.orElseThrow(
() ->
new RuntimeException(
"Can't get table stats extractor for format "
+ format));
Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, format, rollback);
return constructFileMeta(
newPath.getName(),
fileStatus.getLen(),
newPath,
simpleStatsExtractor,
fileIO,
table,
schemaId);
} catch (IOException e) {
throw new RuntimeException("error when construct file meta", e);
}
}

// -----------------------------private method---------------------------------------------

private static Path renameFile(
Expand All @@ -152,7 +193,29 @@ private static DataFileMeta constructFileMeta(
FileIO fileIO,
Table table)
throws IOException {
SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(table.rowType());
return constructFileMeta(
fileName,
fileSize,
path,
simpleStatsExtractor,
fileIO,
table,
((FileStoreTable) table).schema().id());
}

private static DataFileMeta constructFileMeta(
String fileName,
long fileSize,
Path path,
SimpleStatsExtractor simpleStatsExtractor,
FileIO fileIO,
Table table,
long schemaId)
throws IOException {
RowType rowTypeWithSchemaId =
((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType();

SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(rowTypeWithSchemaId);

Pair<SimpleColStats[], SimpleStatsExtractor.FileInfo> fileInfo =
simpleStatsExtractor.extractWithFileInfo(fileIO, path);
Expand All @@ -165,7 +228,7 @@ private static DataFileMeta constructFileMeta(
stats,
0,
0,
((FileStoreTable) table).schema().id(),
schemaId,
Collections.emptyList(),
null,
FileSource.APPEND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
}

@VisibleForTesting
boolean commit(TableSchema newSchema) throws Exception {
public boolean commit(TableSchema newSchema) throws Exception {
SchemaValidation.validateTableSchema(newSchema);
SchemaValidation.validateFallbackBranch(this, newSchema);
Path schemaPath = toSchemaPath(newSchema.id());
Expand Down
Loading
Loading