Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public static Content fromId(int id) {
private final InternalMap lowerBounds;
private final InternalMap upperBounds;

// only used for iceberg migrate
private long schemaId = 0;

IcebergDataFileMeta(
Content content,
String filePath,
Expand Down Expand Up @@ -201,6 +204,15 @@ public InternalMap upperBounds() {
return upperBounds;
}

public long schemaId() {
return schemaId;
}

public IcebergDataFileMeta withSchemaId(long schemaId) {
this.schemaId = schemaId;
return this;
}

public static RowType schema(RowType partitionType) {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(134, "content", DataTypes.INT().notNull()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.iceberg.migrate;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
Expand All @@ -43,19 +44,26 @@
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;

import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand All @@ -64,6 +72,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;

/** migrate iceberg table to paimon table. */
Expand All @@ -75,6 +84,7 @@ public class IcebergMigrator implements Migrator {
private final Catalog paimonCatalog;
private final String paimonDatabaseName;
private final String paimonTableName;
private final CoreOptions coreOptions;

private final String icebergDatabaseName;
private final String icebergTableName;
Expand All @@ -97,10 +107,18 @@ public IcebergMigrator(
String icebergDatabaseName,
String icebergTableName,
Options icebergOptions,
Integer parallelism) {
Integer parallelism,
Map<String, String> options) {
this.paimonCatalog = paimonCatalog;
this.paimonDatabaseName = paimonDatabaseName;
this.paimonTableName = paimonTableName;
this.coreOptions = new CoreOptions(options);
checkArgument(
coreOptions.bucket() == -1,
"Iceberg migrator only support unaware-bucket target table, bucket should be -1");
checkArgument(
!options.containsKey(CoreOptions.PRIMARY_KEY.key()),
"Iceberg migrator does not support define primary key for target table.");

this.icebergDatabaseName = icebergDatabaseName;
this.icebergTableName = icebergTableName;
Expand Down Expand Up @@ -136,15 +154,28 @@ public IcebergMigrator(

@Override
public void executeMigrate() throws Exception {
Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata);
List<TableSchema> paimonSchemas = icebergSchemasToPaimonSchemas(icebergMetadata);
Preconditions.checkArgument(
!paimonSchemas.isEmpty(),
"paimon schemas transformed from iceberg table is empty.");
Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, paimonTableName);

paimonCatalog.createDatabase(paimonDatabaseName, true);
paimonCatalog.createTable(paimonIdentifier, paimonSchema, false);
TableSchema firstSchema = paimonSchemas.get(0);
Preconditions.checkArgument(firstSchema.id() == 0, "Unexpected, first schema id is not 0.");
paimonCatalog.createTable(paimonIdentifier, firstSchema.toSchema(), false);

try {
FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
FileIO fileIO = paimonTable.fileIO();
SchemaManager schemaManager = paimonTable.schemaManager();
// commit all the iceberg schemas
for (int i = 1; i < paimonSchemas.size(); i++) {
LOG.info(
"commit new schema from iceberg, new schema id:{}",
paimonSchemas.get(i).id());
schemaManager.commit(paimonSchemas.get(i));
}

IcebergManifestFile manifestFile =
IcebergManifestFile.create(paimonTable, icebergMetaPathFactory);
Expand All @@ -157,25 +188,36 @@ public void executeMigrate() throws Exception {
// check manifest file with 'DELETE' kind
checkAndFilterManifestFiles(icebergManifestFileMetas);

// get all live iceberg entries
List<IcebergManifestEntry> icebergEntries =
icebergManifestFileMetas.stream()
.flatMap(fileMeta -> manifestFile.read(fileMeta).stream())
.filter(IcebergManifestEntry::isLive)
.collect(Collectors.toList());
if (icebergEntries.isEmpty()) {
Map<Long, List<IcebergManifestEntry>> icebergEntries = new HashMap<>();
for (IcebergManifestFileMeta icebergManifestFileMeta : icebergManifestFileMetas) {
long schemaId =
getSchemaIdFromIcebergManifestFile(
new Path(icebergManifestFileMeta.manifestPath()), fileIO);
List<IcebergManifestEntry> entries = manifestFile.read(icebergManifestFileMeta);
icebergEntries
.computeIfAbsent(schemaId, v -> new ArrayList<>())
.addAll(
entries.stream()
.filter(IcebergManifestEntry::isLive)
.collect(Collectors.toList()));
}

List<IcebergDataFileMeta> icebergDataFileMetas = new ArrayList<>();
// write schema id to IcebergDataFileMeta
for (Map.Entry<Long, List<IcebergManifestEntry>> kv : icebergEntries.entrySet()) {
icebergDataFileMetas.addAll(
kv.getValue().stream()
.map(entry -> entry.file().withSchemaId(kv.getKey()))
.collect(Collectors.toList()));
}

if (icebergDataFileMetas.isEmpty()) {
LOG.info(
"No live manifest entry in iceberg table for snapshot {}, iceberg table meta path is {}.",
"No live iceberg data files in iceberg table for snapshot {}, iceberg table meta path is {}.",
icebergMetadata.currentSnapshotId(),
icebergLatestMetadataLocation);
return;
}

List<IcebergDataFileMeta> icebergDataFileMetas =
icebergEntries.stream()
.map(IcebergManifestEntry::file)
.collect(Collectors.toList());

// Again, check if delete File exists
checkAndFilterDataFiles(icebergDataFileMetas);

Expand Down Expand Up @@ -246,10 +288,21 @@ public void renameTable(boolean ignoreIfNotExists) throws Exception {
paimonCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists);
}

public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata) {
// get iceberg current schema
IcebergSchema icebergSchema =
icebergMetadata.schemas().get(icebergMetadata.currentSchemaId());
private List<TableSchema> icebergSchemasToPaimonSchemas(IcebergMetadata icebergMetadata) {
return icebergMetadata.schemas().stream()
.map(
icebergSchema -> {
LOG.info(
"Convert iceberg schema to paimon schema, iceberg schema id: {}",
icebergSchema.schemaId());
return TableSchema.create(
icebergSchema.schemaId(),
icebergSchemaToPaimonSchema(icebergSchema));
})
.collect(Collectors.toList());
}

private Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) {

// get iceberg current partition spec
int currentPartitionSpecId = icebergMetadata.defaultSpecId();
Expand Down Expand Up @@ -289,6 +342,18 @@ private void checkAndFilterDataFiles(List<IcebergDataFileMeta> icebergDataFileMe
}
}

private long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO fileIO) {

try (DataFileStream<GenericRecord> dataFileStream =
new DataFileStream<>(
fileIO.newInputStream(manifestPath), new GenericDatumReader<>())) {
String schema = dataFileStream.getMetaString("schema");
return JsonSerdeUtil.fromJson(schema, IcebergSchema.class).schemaId();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static List<DataFileMeta> construct(
List<IcebergDataFileMeta> icebergDataFileMetas,
FileIO fileIO,
Expand Down Expand Up @@ -318,7 +383,9 @@ private static DataFileMeta constructFileMeta(
e);
}
String format = icebergDataFileMeta.fileFormat();
return FileMetaUtils.constructFileMeta(format, status, fileIO, table, dir, rollback);
long schemaId = icebergDataFileMeta.schemaId();
return FileMetaUtils.constructFileMeta(
format, status, fileIO, table, dir, rollback, schemaId);
}

private MigrateTask importUnPartitionedTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,47 @@ public static DataFileMeta constructFileMeta(
}
}

public static DataFileMeta constructFileMeta(
String format,
FileStatus fileStatus,
FileIO fileIO,
Table table,
Path dir,
Map<Path, Path> rollback,
long schemaId) {

try {
RowType rowTypeWithSchemaId =
((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType();
SimpleColStatsCollector.Factory[] factories =
StatsCollectorFactories.createStatsFactories(
((FileStoreTable) table).coreOptions(),
rowTypeWithSchemaId.getFieldNames());

SimpleStatsExtractor simpleStatsExtractor =
FileFormat.fromIdentifier(
format,
((FileStoreTable) table).coreOptions().toConfiguration())
.createStatsExtractor(rowTypeWithSchemaId, factories)
.orElseThrow(
() ->
new RuntimeException(
"Can't get table stats extractor for format "
+ format));
Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, format, rollback);
return constructFileMeta(
newPath.getName(),
fileStatus.getLen(),
newPath,
simpleStatsExtractor,
fileIO,
table,
schemaId);
} catch (IOException e) {
throw new RuntimeException("error when construct file meta", e);
}
}

// -----------------------------private method---------------------------------------------

private static Path renameFile(
Expand All @@ -152,7 +193,29 @@ private static DataFileMeta constructFileMeta(
FileIO fileIO,
Table table)
throws IOException {
SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(table.rowType());
return constructFileMeta(
fileName,
fileSize,
path,
simpleStatsExtractor,
fileIO,
table,
((FileStoreTable) table).schema().id());
}

private static DataFileMeta constructFileMeta(
String fileName,
long fileSize,
Path path,
SimpleStatsExtractor simpleStatsExtractor,
FileIO fileIO,
Table table,
long schemaId)
throws IOException {
RowType rowTypeWithSchemaId =
((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType();

SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(rowTypeWithSchemaId);

Pair<SimpleColStats[], SimpleStatsExtractor.FileInfo> fileInfo =
simpleStatsExtractor.extractWithFileInfo(fileIO, path);
Expand All @@ -165,7 +228,7 @@ private static DataFileMeta constructFileMeta(
stats,
0,
0,
((FileStoreTable) table).schema().id(),
schemaId,
Collections.emptyList(),
null,
FileSource.APPEND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ protected void updateLastColumn(List<DataField> newFields, String fieldName)
}

@VisibleForTesting
boolean commit(TableSchema newSchema) throws Exception {
public boolean commit(TableSchema newSchema) throws Exception {
SchemaValidation.validateTableSchema(newSchema);
SchemaValidation.validateFallbackBranch(this, newSchema);
Path schemaPath = toSchemaPath(newSchema.id());
Expand Down
Loading
Loading