Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public Map<String, String> options() {
return catalogOptions.toMap();
}

@Override
public abstract String warehouse();

public FileIO fileIO() {
return fileIO;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.catalog;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -368,12 +367,6 @@ default void repairTable(Identifier identifier) throws TableNotExistException {

// ==================== Catalog Information ==========================

/** Warehouse root path for creating new databases. */
String warehouse();

/** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */
FileIO fileIO();

/** Catalog options for re-creating this catalog. */
Map<String, String> options();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.catalog;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -46,21 +45,11 @@ public boolean caseSensitive() {
return wrapped.caseSensitive();
}

@Override
public String warehouse() {
return wrapped.warehouse();
}

@Override
public Map<String, String> options() {
return wrapped.options();
}

@Override
public FileIO fileIO() {
return wrapped.fileIO();
}

@Override
public List<String> listDatabases() {
return wrapped.listDatabases();
Expand Down Expand Up @@ -200,4 +189,11 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
public void close() throws Exception {
wrapped.close();
}

public static Catalog rootCatalog(Catalog catalog) {
while (catalog instanceof DelegateCatalog) {
catalog = ((DelegateCatalog) catalog).wrapped();
}
return catalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.iceberg.migrate;

import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand All @@ -38,16 +39,14 @@ public class IcebergMigrateHadoopMetadata implements IcebergMigrateMetadata {
private static final String VERSION_HINT_FILENAME = "version-hint.text";
private static final String ICEBERG_WAREHOUSE = "iceberg_warehouse";

private final FileIO fileIO;
private final Identifier icebergIdentifier;
private final Options icebergOptions;

private Path icebergLatestMetaVersionPath;
private IcebergPathFactory icebergMetaPathFactory;
private FileIO fileIO;

public IcebergMigrateHadoopMetadata(
Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) {
this.fileIO = fileIO;
public IcebergMigrateHadoopMetadata(Identifier icebergIdentifier, Options icebergOptions) {
this.icebergIdentifier = icebergIdentifier;
this.icebergOptions = icebergOptions;
}
Expand All @@ -58,15 +57,19 @@ public IcebergMetadata icebergMetadata() {
icebergOptions.get(ICEBERG_WAREHOUSE) != null,
"'iceberg_warehouse' is null. "
+ "In hadoop-catalog, you should explicitly set this argument for finding iceberg metadata.");
Path path =
new Path(
String.format(
"%s/%s/metadata",
icebergIdentifier.getDatabaseName(),
icebergIdentifier.getTableName()));
try {
fileIO = FileIO.get(path, CatalogContext.create(icebergOptions));
} catch (IOException e) {
throw new RuntimeException(e);
}
this.icebergMetaPathFactory =
new IcebergPathFactory(
new Path(
icebergOptions.get(ICEBERG_WAREHOUSE),
new Path(
String.format(
"%s/%s/metadata",
icebergIdentifier.getDatabaseName(),
icebergIdentifier.getTableName()))));
new IcebergPathFactory(new Path(icebergOptions.get(ICEBERG_WAREHOUSE), path));
long icebergLatestMetaVersion = getIcebergLatestMetaVersion();

this.icebergLatestMetaVersionPath =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.iceberg.migrate;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.options.Options;

Expand All @@ -28,12 +27,12 @@ public class IcebergMigrateHadoopMetadataFactory implements IcebergMigrateMetada

@Override
public String identifier() {
return IcebergOptions.StorageType.HADOOP_CATALOG.toString() + "_migrate";
return IcebergOptions.StorageType.HADOOP_CATALOG + "_migrate";
}

@Override
public IcebergMigrateHadoopMetadata create(
Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) {
return new IcebergMigrateHadoopMetadata(icebergIdentifier, fileIO, icebergOptions);
Identifier icebergIdentifier, Options icebergOptions) {
return new IcebergMigrateHadoopMetadata(icebergIdentifier, icebergOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.factories.Factory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.options.Options;

/** Factory to create {@link IcebergMigrateMetadata}. */
public interface IcebergMigrateMetadataFactory extends Factory {

IcebergMigrateMetadata create(
Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions);
IcebergMigrateMetadata create(Identifier icebergIdentifier, Options icebergOptions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public class IcebergMigrator implements Migrator {
private final ThreadPoolExecutor executor;

private final Catalog paimonCatalog;
private final FileIO paimonFileIO;
private final String paimonDatabaseName;
private final String paimonTableName;

Expand All @@ -100,7 +99,6 @@ public IcebergMigrator(
Options icebergOptions,
Integer parallelism) {
this.paimonCatalog = paimonCatalog;
this.paimonFileIO = paimonCatalog.fileIO();
this.paimonDatabaseName = paimonDatabaseName;
this.paimonTableName = paimonTableName;

Expand All @@ -126,9 +124,7 @@ public IcebergMigrator(

icebergMigrateMetadata =
icebergMigrateMetadataFactory.create(
Identifier.create(icebergDatabaseName, icebergTableName),
paimonFileIO,
icebergOptions);
Identifier.create(icebergDatabaseName, icebergTableName), icebergOptions);

this.icebergMetadata = icebergMigrateMetadata.icebergMetadata();
this.icebergLatestMetadataLocation = icebergMigrateMetadata.icebergLatestMetadataLocation();
Expand All @@ -148,6 +144,7 @@ public void executeMigrate() throws Exception {

try {
FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
FileIO fileIO = paimonTable.fileIO();

IcebergManifestFile manifestFile =
IcebergManifestFile.create(paimonTable, icebergMetaPathFactory);
Expand Down Expand Up @@ -214,8 +211,8 @@ public void executeMigrate() throws Exception {
for (Map.Entry<Path, Path> entry : rollback.entrySet()) {
Path newPath = entry.getKey();
Path origin = entry.getValue();
if (paimonFileIO.exists(newPath)) {
paimonFileIO.rename(newPath, origin);
if (fileIO.exists(newPath)) {
fileIO.rename(newPath, origin);
}
}

Expand Down Expand Up @@ -331,8 +328,7 @@ private MigrateTask importUnPartitionedTable(
BinaryRow partitionRow = BinaryRow.EMPTY_ROW;
Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);

return new MigrateTask(
icebergDataFileMetas, paimonFileIO, paimonTable, partitionRow, newDir, rollback);
return new MigrateTask(icebergDataFileMetas, paimonTable, partitionRow, newDir, rollback);
}

private List<MigrateTask> importPartitionedTable(
Expand All @@ -347,13 +343,7 @@ private List<MigrateTask> importPartitionedTable(
BinaryRow partitionRow = entry.getKey();
Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
migrateTasks.add(
new MigrateTask(
entry.getValue(),
paimonFileIO,
paimonTable,
partitionRow,
newDir,
rollback));
new MigrateTask(entry.getValue(), paimonTable, partitionRow, newDir, rollback));
}
return migrateTasks;
}
Expand All @@ -362,21 +352,18 @@ private List<MigrateTask> importPartitionedTable(
public static class MigrateTask implements Callable<CommitMessage> {

private final List<IcebergDataFileMeta> icebergDataFileMetas;
private final FileIO fileIO;
private final FileStoreTable paimonTable;
private final BinaryRow partitionRow;
private final Path newDir;
private final Map<Path, Path> rollback;

public MigrateTask(
List<IcebergDataFileMeta> icebergDataFileMetas,
FileIO fileIO,
FileStoreTable paimonTable,
BinaryRow partitionRow,
Path newDir,
Map<Path, Path> rollback) {
this.icebergDataFileMetas = icebergDataFileMetas;
this.fileIO = fileIO;
this.paimonTable = paimonTable;
this.partitionRow = partitionRow;
this.newDir = newDir;
Expand All @@ -385,6 +372,7 @@ public MigrateTask(

@Override
public CommitMessage call() throws Exception {
FileIO fileIO = paimonTable.fileIO();
if (!fileIO.exists(newDir)) {
fileIO.mkdirs(newDir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializableConsumer;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -81,15 +80,11 @@ public LocalOrphanFilesClean(FileStoreTable table) {
}

public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) {
this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path), false);
this(table, olderThanMillis, false);
}

public LocalOrphanFilesClean(
FileStoreTable table,
long olderThanMillis,
SerializableConsumer<Path> fileCleaner,
boolean dryRun) {
super(table, olderThanMillis, fileCleaner);
public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis, boolean dryRun) {
super(table, olderThanMillis, dryRun);
this.deleteFiles = new ArrayList<>();
this.executor =
createCachedThreadPool(
Expand Down Expand Up @@ -125,7 +120,7 @@ public CleanOrphanFilesResult clean()
.forEach(
deleteFileInfo -> {
deletedFilesLenInBytes.addAndGet(deleteFileInfo.getRight());
fileCleaner.accept(deleteFileInfo.getLeft());
cleanFile(deleteFileInfo.getLeft());
});
deleteFiles.addAll(
candidateDeletes.stream()
Expand Down Expand Up @@ -239,7 +234,6 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
String databaseName,
@Nullable String tableName,
long olderThanMillis,
SerializableConsumer<Path> fileCleaner,
@Nullable Integer parallelism,
boolean dryRun)
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
Expand Down Expand Up @@ -269,8 +263,7 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
table.getClass().getName());

orphanFilesCleans.add(
new LocalOrphanFilesClean(
(FileStoreTable) table, olderThanMillis, fileCleaner, dryRun));
new LocalOrphanFilesClean((FileStoreTable) table, olderThanMillis, dryRun));
}

return orphanFilesCleans;
Expand All @@ -281,19 +274,12 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
String databaseName,
@Nullable String tableName,
long olderThanMillis,
SerializableConsumer<Path> fileCleaner,
@Nullable Integer parallelism,
boolean dryRun)
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
List<LocalOrphanFilesClean> tableCleans =
createOrphanFilesCleans(
catalog,
databaseName,
tableName,
olderThanMillis,
fileCleaner,
parallelism,
dryRun);
catalog, databaseName, tableName, olderThanMillis, parallelism, dryRun);

ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Expand Down
Loading
Loading