diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 7a72da38e76f..508d8de625fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -86,7 +86,8 @@ public Map options() { return catalogOptions.toMap(); } - @Override + public abstract String warehouse(); + public FileIO fileIO() { return fileIO; } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index d0ad86c224d7..e03cbc3ac8e3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -19,7 +19,6 @@ package org.apache.paimon.catalog; import org.apache.paimon.annotation.Public; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -368,12 +367,6 @@ default void repairTable(Identifier identifier) throws TableNotExistException { // ==================== Catalog Information ========================== - /** Warehouse root path for creating new databases. */ - String warehouse(); - - /** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */ - FileIO fileIO(); - /** Catalog options for re-creating this catalog. */ Map options(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index aa7852456e5a..2d8d3f3e1250 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -18,7 +18,6 @@ package org.apache.paimon.catalog; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -46,21 +45,11 @@ public boolean caseSensitive() { return wrapped.caseSensitive(); } - @Override - public String warehouse() { - return wrapped.warehouse(); - } - @Override public Map options() { return wrapped.options(); } - @Override - public FileIO fileIO() { - return wrapped.fileIO(); - } - @Override public List listDatabases() { return wrapped.listDatabases(); @@ -200,4 +189,11 @@ public void repairTable(Identifier identifier) throws TableNotExistException { public void close() throws Exception { wrapped.close(); } + + public static Catalog rootCatalog(Catalog catalog) { + while (catalog instanceof DelegateCatalog) { + catalog = ((DelegateCatalog) catalog).wrapped(); + } + return catalog; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java index a6c5fb027ba2..1991be34e926 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java @@ -18,6 +18,7 @@ package org.apache.paimon.iceberg.migrate; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -38,16 +39,14 @@ public class IcebergMigrateHadoopMetadata implements IcebergMigrateMetadata { private static final String VERSION_HINT_FILENAME = "version-hint.text"; private static final String ICEBERG_WAREHOUSE = "iceberg_warehouse"; - private final FileIO fileIO; private final Identifier icebergIdentifier; private final Options icebergOptions; private Path icebergLatestMetaVersionPath; private IcebergPathFactory icebergMetaPathFactory; + private FileIO fileIO; - public IcebergMigrateHadoopMetadata( - Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { - this.fileIO = fileIO; + public IcebergMigrateHadoopMetadata(Identifier icebergIdentifier, Options icebergOptions) { this.icebergIdentifier = icebergIdentifier; this.icebergOptions = icebergOptions; } @@ -58,15 +57,19 @@ public IcebergMetadata icebergMetadata() { icebergOptions.get(ICEBERG_WAREHOUSE) != null, "'iceberg_warehouse' is null. " + "In hadoop-catalog, you should explicitly set this argument for finding iceberg metadata."); + Path path = + new Path( + String.format( + "%s/%s/metadata", + icebergIdentifier.getDatabaseName(), + icebergIdentifier.getTableName())); + try { + fileIO = FileIO.get(path, CatalogContext.create(icebergOptions)); + } catch (IOException e) { + throw new RuntimeException(e); + } this.icebergMetaPathFactory = - new IcebergPathFactory( - new Path( - icebergOptions.get(ICEBERG_WAREHOUSE), - new Path( - String.format( - "%s/%s/metadata", - icebergIdentifier.getDatabaseName(), - icebergIdentifier.getTableName())))); + new IcebergPathFactory(new Path(icebergOptions.get(ICEBERG_WAREHOUSE), path)); long icebergLatestMetaVersion = getIcebergLatestMetaVersion(); this.icebergLatestMetaVersionPath = diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java index 666630101445..50857f1d3753 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java @@ -19,7 +19,6 @@ package org.apache.paimon.iceberg.migrate; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.iceberg.IcebergOptions; import org.apache.paimon.options.Options; @@ -28,12 +27,12 @@ public class IcebergMigrateHadoopMetadataFactory implements IcebergMigrateMetada @Override public String identifier() { - return IcebergOptions.StorageType.HADOOP_CATALOG.toString() + "_migrate"; + return IcebergOptions.StorageType.HADOOP_CATALOG + "_migrate"; } @Override public IcebergMigrateHadoopMetadata create( - Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { - return new IcebergMigrateHadoopMetadata(icebergIdentifier, fileIO, icebergOptions); + Identifier icebergIdentifier, Options icebergOptions) { + return new IcebergMigrateHadoopMetadata(icebergIdentifier, icebergOptions); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java index f727088f5d11..3baec4404eec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java @@ -20,12 +20,10 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.factories.Factory; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.options.Options; /** Factory to create {@link IcebergMigrateMetadata}. */ public interface IcebergMigrateMetadataFactory extends Factory { - IcebergMigrateMetadata create( - Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions); + IcebergMigrateMetadata create(Identifier icebergIdentifier, Options icebergOptions); } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java index 44162dea7fc3..9e91fa2d18a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java @@ -73,7 +73,6 @@ public class IcebergMigrator implements Migrator { private final ThreadPoolExecutor executor; private final Catalog paimonCatalog; - private final FileIO paimonFileIO; private final String paimonDatabaseName; private final String paimonTableName; @@ -100,7 +99,6 @@ public IcebergMigrator( Options icebergOptions, Integer parallelism) { this.paimonCatalog = paimonCatalog; - this.paimonFileIO = paimonCatalog.fileIO(); this.paimonDatabaseName = paimonDatabaseName; this.paimonTableName = paimonTableName; @@ -126,9 +124,7 @@ public IcebergMigrator( icebergMigrateMetadata = icebergMigrateMetadataFactory.create( - Identifier.create(icebergDatabaseName, icebergTableName), - paimonFileIO, - icebergOptions); + Identifier.create(icebergDatabaseName, icebergTableName), icebergOptions); this.icebergMetadata = icebergMigrateMetadata.icebergMetadata(); this.icebergLatestMetadataLocation = icebergMigrateMetadata.icebergLatestMetadataLocation(); @@ -148,6 +144,7 @@ public void executeMigrate() throws Exception { try { FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); + FileIO fileIO = paimonTable.fileIO(); IcebergManifestFile manifestFile = IcebergManifestFile.create(paimonTable, icebergMetaPathFactory); @@ -214,8 +211,8 @@ public void executeMigrate() throws Exception { for (Map.Entry entry : rollback.entrySet()) { Path newPath = entry.getKey(); Path origin = entry.getValue(); - if (paimonFileIO.exists(newPath)) { - paimonFileIO.rename(newPath, origin); + if (fileIO.exists(newPath)) { + fileIO.rename(newPath, origin); } } @@ -331,8 +328,7 @@ private MigrateTask importUnPartitionedTable( BinaryRow partitionRow = BinaryRow.EMPTY_ROW; Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0); - return new MigrateTask( - icebergDataFileMetas, paimonFileIO, paimonTable, partitionRow, newDir, rollback); + return new MigrateTask(icebergDataFileMetas, paimonTable, partitionRow, newDir, rollback); } private List importPartitionedTable( @@ -347,13 +343,7 @@ private List importPartitionedTable( BinaryRow partitionRow = entry.getKey(); Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0); migrateTasks.add( - new MigrateTask( - entry.getValue(), - paimonFileIO, - paimonTable, - partitionRow, - newDir, - rollback)); + new MigrateTask(entry.getValue(), paimonTable, partitionRow, newDir, rollback)); } return migrateTasks; } @@ -362,7 +352,6 @@ private List importPartitionedTable( public static class MigrateTask implements Callable { private final List icebergDataFileMetas; - private final FileIO fileIO; private final FileStoreTable paimonTable; private final BinaryRow partitionRow; private final Path newDir; @@ -370,13 +359,11 @@ public static class MigrateTask implements Callable { public MigrateTask( List icebergDataFileMetas, - FileIO fileIO, FileStoreTable paimonTable, BinaryRow partitionRow, Path newDir, Map rollback) { this.icebergDataFileMetas = icebergDataFileMetas; - this.fileIO = fileIO; this.paimonTable = paimonTable; this.partitionRow = partitionRow; this.newDir = newDir; @@ -385,6 +372,7 @@ public MigrateTask( @Override public CommitMessage call() throws Exception { + FileIO fileIO = paimonTable.fileIO(); if (!fileIO.exists(newDir)) { fileIO.mkdirs(newDir); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 8f7767047230..fc7895b1e2d1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -27,7 +27,6 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.SerializableConsumer; import javax.annotation.Nullable; @@ -81,15 +80,11 @@ public LocalOrphanFilesClean(FileStoreTable table) { } public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) { - this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path), false); + this(table, olderThanMillis, false); } - public LocalOrphanFilesClean( - FileStoreTable table, - long olderThanMillis, - SerializableConsumer fileCleaner, - boolean dryRun) { - super(table, olderThanMillis, fileCleaner); + public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis, boolean dryRun) { + super(table, olderThanMillis, dryRun); this.deleteFiles = new ArrayList<>(); this.executor = createCachedThreadPool( @@ -125,7 +120,7 @@ public CleanOrphanFilesResult clean() .forEach( deleteFileInfo -> { deletedFilesLenInBytes.addAndGet(deleteFileInfo.getRight()); - fileCleaner.accept(deleteFileInfo.getLeft()); + cleanFile(deleteFileInfo.getLeft()); }); deleteFiles.addAll( candidateDeletes.stream() @@ -239,7 +234,6 @@ public static List createOrphanFilesCleans( String databaseName, @Nullable String tableName, long olderThanMillis, - SerializableConsumer fileCleaner, @Nullable Integer parallelism, boolean dryRun) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { @@ -269,8 +263,7 @@ public static List createOrphanFilesCleans( table.getClass().getName()); orphanFilesCleans.add( - new LocalOrphanFilesClean( - (FileStoreTable) table, olderThanMillis, fileCleaner, dryRun)); + new LocalOrphanFilesClean((FileStoreTable) table, olderThanMillis, dryRun)); } return orphanFilesCleans; @@ -281,19 +274,12 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles( String databaseName, @Nullable String tableName, long olderThanMillis, - SerializableConsumer fileCleaner, @Nullable Integer parallelism, boolean dryRun) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { List tableCleans = createOrphanFilesCleans( - catalog, - databaseName, - tableName, - olderThanMillis, - fileCleaner, - parallelism, - dryRun); + catalog, databaseName, tableName, olderThanMillis, parallelism, dryRun); ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 63214253d4f3..03e20ae4f901 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -19,7 +19,6 @@ package org.apache.paimon.operation; import org.apache.paimon.Snapshot; -import org.apache.paimon.catalog.Catalog; import org.apache.paimon.data.Timestamp; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; @@ -36,7 +35,6 @@ import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.SerializableConsumer; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.SupplierWithIOException; import org.apache.paimon.utils.TagManager; @@ -90,18 +88,17 @@ public abstract class OrphanFilesClean implements Serializable { protected final FileStoreTable table; protected final FileIO fileIO; protected final long olderThanMillis; - protected final SerializableConsumer fileCleaner; + protected final boolean dryRun; protected final int partitionKeysNum; protected final Path location; - public OrphanFilesClean( - FileStoreTable table, long olderThanMillis, SerializableConsumer fileCleaner) { + public OrphanFilesClean(FileStoreTable table, long olderThanMillis, boolean dryRun) { this.table = table; this.fileIO = table.fileIO(); this.partitionKeysNum = table.partitionKeys().size(); this.location = table.location(); this.olderThanMillis = olderThanMillis; - this.fileCleaner = fileCleaner; + this.dryRun = dryRun; } protected List validBranches() { @@ -163,7 +160,20 @@ private void cleanFile( Long fileSize = deleteFileInfo.getRight(); deletedFilesConsumer.accept(filePath); deletedFilesLenInBytesConsumer.accept(fileSize); - fileCleaner.accept(filePath); + cleanFile(filePath); + } + + protected void cleanFile(Path path) { + if (!dryRun) { + try { + if (fileIO.isDir(path)) { + fileIO.deleteDirectoryQuietly(path); + } else { + fileIO.deleteQuietly(path); + } + } catch (IOException ignored) { + } + } } protected Set safelyGetAllSnapshots(String branch) throws IOException { @@ -362,28 +372,6 @@ protected boolean oldEnough(FileStatus status) { return status.getModificationTime() < olderThanMillis; } - public static SerializableConsumer createFileCleaner( - Catalog catalog, @Nullable Boolean dryRun) { - SerializableConsumer fileCleaner; - if (Boolean.TRUE.equals(dryRun)) { - fileCleaner = path -> {}; - } else { - FileIO fileIO = catalog.fileIO(); - fileCleaner = - path -> { - try { - if (fileIO.isDir(path)) { - fileIO.deleteDirectoryQuietly(path); - } else { - fileIO.deleteQuietly(path); - } - } catch (IOException ignored) { - } - }; - } - return fileCleaner; - } - public static long olderThanMillis(@Nullable String olderThan) { if (isNullOrWhitespaceOnly(olderThan)) { return System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); @@ -412,10 +400,20 @@ protected void tryCleanDataDirectory(Set dataDirs, int maxLevel) { } public boolean tryDeleteEmptyDirectory(Path path) { + if (dryRun) { + return false; + } + try { return fileIO.delete(path, false); } catch (IOException e) { return false; } } + + /** Cleaner to clean files. */ + public interface FileCleaner extends Serializable { + + void clean(String table, Path path); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java index acbd15a634ee..875542667000 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java @@ -18,6 +18,7 @@ package org.apache.paimon.privilege; +import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.DelegateCatalog; @@ -56,10 +57,14 @@ public PrivilegedCatalog(Catalog wrapped, PrivilegeManagerLoader privilegeManage } public static Catalog tryToCreate(Catalog catalog, Options options) { + if (!(rootCatalog(catalog) instanceof AbstractCatalog)) { + return catalog; + } + FileBasedPrivilegeManagerLoader fileBasedPrivilegeManagerLoader = new FileBasedPrivilegeManagerLoader( - catalog.warehouse(), - catalog.fileIO(), + ((AbstractCatalog) rootCatalog(catalog)).warehouse(), + ((AbstractCatalog) rootCatalog(catalog)).fileIO(), options.get(PrivilegedCatalog.USER), options.get(PrivilegedCatalog.PASSWORD)); FileBasedPrivilegeManager fileBasedPrivilegeManager = diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 9c8100662b9d..abfdb7b35138 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -144,11 +144,6 @@ public RESTCatalog(CatalogContext context, boolean configRequired) { this.fileIO = dataTokenEnabled ? null : fileIOFromOptions(new Path(options.get(WAREHOUSE))); } - @Override - public String warehouse() { - return context.options().get(WAREHOUSE); - } - @Override public Map options() { return context.options().toMap(); @@ -159,15 +154,6 @@ public RESTCatalogLoader catalogLoader() { return new RESTCatalogLoader(context); } - @Override - public FileIO fileIO() { - // TODO remove Catalog.fileIO - if (dataTokenEnabled) { - throw new UnsupportedOperationException(); - } - return fileIO; - } - @Override public List listDatabases() { ListDatabasesResponse response = diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 05f3adb3c419..ce3c87c4c6d4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -88,7 +88,7 @@ public RESTCatalogServer(String warehouse, String initToken) { Options conf = new Options(); conf.setString("warehouse", warehouse); this.catalog = TestRESTCatalog.create(CatalogContext.create(conf)); - this.dispatcher = initDispatcher(catalog, authToken); + this.dispatcher = initDispatcher(catalog, warehouse, authToken); MockWebServer mockWebServer = new MockWebServer(); mockWebServer.setDispatcher(dispatcher); server = mockWebServer; @@ -106,7 +106,7 @@ public void shutdown() throws IOException { server.shutdown(); } - public static Dispatcher initDispatcher(Catalog catalog, String authToken) { + public static Dispatcher initDispatcher(Catalog catalog, String warehouse, String authToken) { return new Dispatcher() { @Override public MockResponse dispatch(RecordedRequest request) { @@ -119,7 +119,7 @@ public MockResponse dispatch(RecordedRequest request) { if ("/v1/config".equals(request.getPath())) { return new MockResponse() .setResponseCode(200) - .setBody(getConfigBody(catalog.warehouse())); + .setBody(getConfigBody(warehouse)); } else if (DATABASE_URI.equals(request.getPath())) { return databasesApiHandler(catalog, request); } else if (request.getPath().startsWith(DATABASE_URI)) { diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index 4f8217ffce40..70f797b03d1c 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -27,7 +27,6 @@ import java.util.Locale; -import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner; import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis; /** @@ -97,7 +96,7 @@ public String[] call( procedureContext.getExecutionEnvironment(), catalog, olderThanMillis(olderThan), - createFileCleaner(catalog, dryRun), + dryRun, parallelism, databaseName, tableName); @@ -109,7 +108,6 @@ public String[] call( databaseName, tableName, olderThanMillis(olderThan), - createFileCleaner(catalog, dryRun), parallelism, dryRun); break; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java index 318089b30be5..030d76fc62cb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java @@ -23,7 +23,6 @@ import java.util.Map; import static org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.executeDatabaseOrphanFiles; -import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner; import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis; /** Action to remove the orphan data files and metadata files. */ @@ -61,7 +60,7 @@ public void run() throws Exception { env, catalog, olderThanMillis(olderThan), - createFileCleaner(catalog, dryRun), + dryRun, parallelism == null ? null : Integer.parseInt(parallelism), databaseName, tableName); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java index e7002cce1eec..040718ad87d6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java @@ -25,6 +25,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.utils.IOUtils; @@ -49,7 +50,8 @@ public class CopyFileOperator extends AbstractStreamOperator private transient Catalog sourceCatalog; private transient Catalog targetCatalog; - private transient Map srcLocations; + private transient Map srcFileIOs; + private transient Map targetFileIOs; private transient Map targetLocations; public CopyFileOperator( @@ -64,7 +66,8 @@ public void open() throws Exception { FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig)); targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig)); - srcLocations = new HashMap<>(); + srcFileIOs = new HashMap<>(); + targetFileIOs = new HashMap<>(); targetLocations = new HashMap<>(); } @@ -72,20 +75,32 @@ public void open() throws Exception { public void processElement(StreamRecord streamRecord) throws Exception { CloneFileInfo cloneFileInfo = streamRecord.getValue(); - FileIO sourceTableFileIO = sourceCatalog.fileIO(); - FileIO targetTableFileIO = targetCatalog.fileIO(); - - Path sourceTableRootPath = - srcLocations.computeIfAbsent( + FileIO sourceTableFileIO = + srcFileIOs.computeIfAbsent( cloneFileInfo.getSourceIdentifier(), key -> { try { - return pathOfTable( - sourceCatalog.getTable(Identifier.fromString(key))); + return ((FileStoreTable) + sourceCatalog.getTable(Identifier.fromString(key))) + .fileIO(); } catch (Catalog.TableNotExistException e) { throw new RuntimeException(e); } }); + + FileIO targetTableFileIO = + targetFileIOs.computeIfAbsent( + cloneFileInfo.getTargetIdentifier(), + key -> { + try { + return ((FileStoreTable) + targetCatalog.getTable(Identifier.fromString(key))) + .fileIO(); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + }); + Path targetTableRootPath = targetLocations.computeIfAbsent( cloneFileInfo.getTargetIdentifier(), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index 23bbbc9b609c..39dce07c5e7e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -32,7 +32,6 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.SerializableConsumer; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -76,9 +75,9 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean { public FlinkOrphanFilesClean( FileStoreTable table, long olderThanMillis, - SerializableConsumer fileCleaner, + boolean dryRun, @Nullable Integer parallelism) { - super(table, olderThanMillis, fileCleaner); + super(table, olderThanMillis, dryRun); this.parallelism = parallelism; } @@ -297,7 +296,7 @@ public void processElement2( if (!used.contains(path.getName())) { emittedFilesCount++; emittedFilesLen += fileInfo.getRight(); - fileCleaner.accept(path); + cleanFile(path); LOG.info("Dry clean: {}", path); } } @@ -319,7 +318,7 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles( StreamExecutionEnvironment env, Catalog catalog, long olderThanMillis, - SerializableConsumer fileCleaner, + boolean dryRun, @Nullable Integer parallelism, String databaseName, @Nullable String tableName) @@ -341,10 +340,7 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles( DataStream clean = new FlinkOrphanFilesClean( - (FileStoreTable) table, - olderThanMillis, - fileCleaner, - parallelism) + (FileStoreTable) table, olderThanMillis, dryRun, parallelism) .doOrphanClean(env); if (clean != null) { orphanFilesCleans.add(clean); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index 8634e1e5e3f7..c3983c8aa528 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -30,7 +30,6 @@ import java.util.Locale; -import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner; import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis; /** @@ -85,7 +84,7 @@ public String[] call( procedureContext.getExecutionEnvironment(), catalog, olderThanMillis(olderThan), - createFileCleaner(catalog, dryRun), + dryRun != null && dryRun, parallelism, databaseName, tableName); @@ -97,9 +96,8 @@ public String[] call( databaseName, tableName, olderThanMillis(olderThan), - createFileCleaner(catalog, dryRun), parallelism, - dryRun); + dryRun != null && dryRun); break; default: throw new IllegalArgumentException( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java index 40c58a1c8a51..4d2e8b38555d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.procedure.privilege; +import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.flink.procedure.ProcedureBase; import org.apache.paimon.options.Options; import org.apache.paimon.privilege.FileBasedPrivilegeManager; @@ -29,6 +30,8 @@ import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; +import static org.apache.paimon.catalog.DelegateCatalog.rootCatalog; + /** * Procedure to initialize file-based privilege system in warehouse. This procedure will * automatically create a root user with the provided password. Usage: @@ -48,11 +51,18 @@ public String[] call(ProcedureContext procedureContext, String rootPassword) { throw new IllegalArgumentException("Catalog is already a PrivilegedCatalog"); } + if (!(rootCatalog(catalog) instanceof AbstractCatalog)) { + throw new IllegalArgumentException( + String.format( + "Catalog %s cannot support Privileged Catalog.", + rootCatalog(catalog).getClass().getName())); + } + Options options = new Options(catalog.options()); PrivilegeManager privilegeManager = new FileBasedPrivilegeManager( - catalog.warehouse(), - catalog.fileIO(), + ((AbstractCatalog) rootCatalog(catalog)).warehouse(), + ((AbstractCatalog) rootCatalog(catalog)).fileIO(), options.get(PrivilegedCatalog.USER), options.get(PrivilegedCatalog.PASSWORD)); privilegeManager.initializePrivilege(rootPassword); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java index 604b1d9b1957..7ddf2eba03d2 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java @@ -120,7 +120,6 @@ public InternalRow[] call(InternalRow args) { identifier.getDatabaseName(), identifier.getTableName(), OrphanFilesClean.olderThanMillis(olderThan), - OrphanFilesClean.createFileCleaner(catalog, dryRun), parallelism, dryRun); break; @@ -131,7 +130,6 @@ public InternalRow[] call(InternalRow args) { identifier.getDatabaseName(), identifier.getTableName(), OrphanFilesClean.olderThanMillis(olderThan), - OrphanFilesClean.createFileCleaner(catalog, dryRun), parallelism, dryRun); break; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala index 328a11c01742..010a3e4edef8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala @@ -44,11 +44,10 @@ import scala.collection.mutable.ArrayBuffer case class SparkOrphanFilesClean( specifiedTable: FileStoreTable, specifiedOlderThanMillis: Long, - specifiedFileCleaner: SerializableConsumer[Path], parallelism: Int, - dryRun: Boolean, + dryRunPara: Boolean, @transient spark: SparkSession) - extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, specifiedFileCleaner) + extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, dryRunPara) with SQLConfHelper with Logging { @@ -150,7 +149,7 @@ case class SparkOrphanFilesClean( val pathToClean = fileInfo.getString(1) val deletedPath = new Path(pathToClean) deletedFilesLenInBytes += fileInfo.getLong(2) - specifiedFileCleaner.accept(deletedPath) + cleanFile(deletedPath) logInfo(s"Cleaned file: $pathToClean") dataDirs.add(fileInfo.getString(3)) deletedFilesCount += 1 @@ -198,7 +197,6 @@ object SparkOrphanFilesClean extends SQLConfHelper { databaseName: String, tableName: String, olderThanMillis: Long, - fileCleaner: SerializableConsumer[Path], parallelismOpt: Integer, dryRun: Boolean): CleanOrphanFilesResult = { val spark = SparkSession.active @@ -230,7 +228,6 @@ object SparkOrphanFilesClean extends SQLConfHelper { new SparkOrphanFilesClean( table, olderThanMillis, - fileCleaner, parallelism, dryRun, spark