diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 953096c0b50c..22987c6292e5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -232,7 +232,11 @@ public StatsFileHandler newStatsFileHandler() { } protected ManifestsReader newManifestsReader(boolean forWrite) { - return new ManifestsReader(partitionType, snapshotManager(), manifestListFactory(forWrite)); + return new ManifestsReader( + partitionType, + options.partitionDefaultName(), + snapshotManager(), + manifestListFactory(forWrite)); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 439f456efb4c..a6790004a116 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -50,10 +50,8 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -160,32 +158,11 @@ public Database getDatabase(String name) throws DatabaseNotExistException { protected abstract Database getDatabaseImpl(String name) throws DatabaseNotExistException; @Override - public void createPartition(Identifier identifier, Map partitionSpec) - throws TableNotExistException { - Identifier tableIdentifier = - Identifier.create(identifier.getDatabaseName(), identifier.getTableName()); - FileStoreTable table = (FileStoreTable) getTable(tableIdentifier); - - if (table.partitionKeys().isEmpty() || !table.coreOptions().partitionedTableInMetastore()) { - return; - } - - MetastoreClient.Factory metastoreFactory = - table.catalogEnvironment().metastoreClientFactory(); - if (metastoreFactory == null) { - throw new UnsupportedOperationException( - "The catalog must have metastore to create partition."); - } - - try (MetastoreClient client = metastoreFactory.create()) { - client.addPartition(new LinkedHashMap<>(partitionSpec)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } + public void createPartitions(Identifier identifier, List> partitions) + throws TableNotExistException {} @Override - public void dropPartition(Identifier identifier, Map partitionSpec) + public void dropPartitions(Identifier identifier, List> partitions) throws TableNotExistException { checkNotSystemTable(identifier, "dropPartition"); Table table = getTable(identifier); @@ -195,11 +172,18 @@ public void dropPartition(Identifier identifier, Map partitionSp .store() .newCommit( createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) { - commit.dropPartitions( - Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER); + commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER); } } + @Override + public void alterPartitions(Identifier identifier, List partitions) + throws TableNotExistException {} + + @Override + public void markDonePartitions(Identifier identifier, List> partitions) + throws TableNotExistException {} + @Override public List listPartitions(Identifier identifier) throws TableNotExistException { return listPartitionsFromFileSystem(getTable(identifier)); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 4796276972b9..23408e569250 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -295,9 +295,18 @@ public List listPartitions(Identifier identifier) throws TableNotExis } @Override - public void dropPartition(Identifier identifier, Map partitions) - throws TableNotExistException, PartitionNotExistException { - wrapped.dropPartition(identifier, partitions); + public void dropPartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + wrapped.dropPartitions(identifier, partitions); + if (partitionCache != null) { + partitionCache.invalidate(identifier); + } + } + + @Override + public void alterPartitions(Identifier identifier, List partitions) + throws TableNotExistException { + wrapped.alterPartitions(identifier, partitions); if (partitionCache != null) { partitionCache.invalidate(identifier); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index e90d3c79c51c..e7d07d6dc433 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -73,6 +73,8 @@ public interface Catalog extends AutoCloseable { /** Return a boolean that indicates whether this catalog is case-sensitive. */ boolean caseSensitive(); + // ======================= database methods =============================== + /** * Get the names of all databases in this catalog. * @@ -139,6 +141,8 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) void alterDatabase(String name, List changes, boolean ignoreIfNotExists) throws DatabaseNotExistException; + // ======================= table methods =============================== + /** * Return a {@link Table} identified by the given {@link Identifier}. * @@ -231,52 +235,81 @@ void alterTable(Identifier identifier, List changes, boolean ignor default void invalidateTable(Identifier identifier) {} /** - * Create the partition of the specify table. + * Modify an existing table from a {@link SchemaChange}. + * + *

NOTE: System tables can not be altered. + * + * @param identifier path of the table to be modified + * @param change the schema change + * @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @throws TableNotExistException if the table does not exist + */ + default void alterTable(Identifier identifier, SchemaChange change, boolean ignoreIfNotExists) + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists); + } + + // ======================= partition methods =============================== + + /** + * Create partitions of the specify table. * *

Only catalog with metastore can support this method, and only table with * 'metastore.partitioned-table' can support this method. * - * @param identifier path of the table to drop partition - * @param partitionSpec the partition to be created + * @param identifier path of the table to create partitions + * @param partitions partitions to be created * @throws TableNotExistException if the table does not exist */ - void createPartition(Identifier identifier, Map partitionSpec) + void createPartitions(Identifier identifier, List> partitions) throws TableNotExistException; /** - * Drop the partition of the specify table. + * Drop partitions of the specify table. * - * @param identifier path of the table to drop partition - * @param partition the partition to be deleted + * @param identifier path of the table to drop partitions + * @param partitions partitions to be deleted * @throws TableNotExistException if the table does not exist - * @throws PartitionNotExistException if the partition does not exist */ - void dropPartition(Identifier identifier, Map partition) - throws TableNotExistException, PartitionNotExistException; + void dropPartitions(Identifier identifier, List> partitions) + throws TableNotExistException; /** - * Get Partition of all partitions of the table. + * Alter partitions of the specify table. * - * @param identifier path of the table to list partitions + *

Only catalog with metastore can support this method, and only table with + * 'metastore.partitioned-table' can support this method. + * + * @param identifier path of the table to alter partitions + * @param partitions partitions to be altered * @throws TableNotExistException if the table does not exist */ - List listPartitions(Identifier identifier) throws TableNotExistException; + void alterPartitions(Identifier identifier, List partitions) + throws TableNotExistException; /** - * Modify an existing table from a {@link SchemaChange}. + * Mark partitions done of the specify table. * - *

NOTE: System tables can not be altered. + *

Only catalog with metastore can support this method, and only table with + * 'metastore.partitioned-table' can support this method. * - * @param identifier path of the table to be modified - * @param change the schema change - * @param ignoreIfNotExists flag to specify behavior when the table does not exist: if set to - * false, throw an exception, if set to true, do nothing. + * @param identifier path of the table to mark done partitions + * @param partitions partitions to be marked done * @throws TableNotExistException if the table does not exist */ - default void alterTable(Identifier identifier, SchemaChange change, boolean ignoreIfNotExists) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists); - } + void markDonePartitions(Identifier identifier, List> partitions) + throws TableNotExistException; + + /** + * Get Partition of all partitions of the table. + * + * @param identifier path of the table to list partitions + * @throws TableNotExistException if the table does not exist + */ + List listPartitions(Identifier identifier) throws TableNotExistException; + + // ======================= view methods =============================== /** * Return a {@link View} identified by the given {@link Identifier}. @@ -340,6 +373,8 @@ default void renameView(Identifier fromView, Identifier toView, boolean ignoreIf throw new UnsupportedOperationException(); } + // ======================= repair methods =============================== + /** * Repair the entire Catalog, repair the metadata in the metastore consistent with the metadata * in the filesystem, register missing tables in the metastore. @@ -508,36 +543,6 @@ public Identifier identifier() { } } - /** Exception for trying to operate on a partition that doesn't exist. */ - class PartitionNotExistException extends Exception { - - private static final String MSG = "Partition %s do not exist in the table %s."; - - private final Identifier identifier; - - private final Map partitionSpec; - - public PartitionNotExistException( - Identifier identifier, Map partitionSpec) { - this(identifier, partitionSpec, null); - } - - public PartitionNotExistException( - Identifier identifier, Map partitionSpec, Throwable cause) { - super(String.format(MSG, partitionSpec, identifier.getFullName()), cause); - this.identifier = identifier; - this.partitionSpec = partitionSpec; - } - - public Identifier identifier() { - return identifier; - } - - public Map partitionSpec() { - return partitionSpec; - } - } - /** Exception for trying to alter a column that already exists. */ class ColumnAlreadyExistException extends Exception { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index cddb76e6835b..fabfa50fc4d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -131,7 +131,7 @@ public static List listPartitionsFromFileSystem(Table table) { InternalRowPartitionComputer computer = new InternalRowPartitionComputer( options.get(PARTITION_DEFAULT_NAME), - table.rowType(), + table.rowType().project(table.partitionKeys()), table.partitionKeys().toArray(new String[0]), options.get(PARTITION_GENERATE_LEGCY_NAME)); List partitionEntries = diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index e2d1a94cfaff..23c50e998635 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -119,6 +119,30 @@ public void alterTable( wrapped.alterTable(identifier, changes, ignoreIfNotExists); } + @Override + public void createPartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + wrapped.createPartitions(identifier, partitions); + } + + @Override + public void dropPartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + wrapped.dropPartitions(identifier, partitions); + } + + @Override + public void alterPartitions(Identifier identifier, List partitions) + throws TableNotExistException { + wrapped.alterPartitions(identifier, partitions); + } + + @Override + public void markDonePartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + wrapped.markDonePartitions(identifier, partitions); + } + @Override public Table getTable(Identifier identifier) throws TableNotExistException { return wrapped.getTable(identifier); @@ -152,18 +176,6 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN wrapped.renameView(fromView, toView, ignoreIfNotExists); } - @Override - public void createPartition(Identifier identifier, Map partitions) - throws TableNotExistException { - wrapped.createPartition(identifier, partitions); - } - - @Override - public void dropPartition(Identifier identifier, Map partitions) - throws TableNotExistException, PartitionNotExistException { - wrapped.dropPartition(identifier, partitions); - } - @Override public List listPartitions(Identifier identifier) throws TableNotExistException { return wrapped.listPartitions(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 4f8a1f3264de..27ba4703b979 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -118,6 +118,12 @@ public FileStoreScan withPartitionFilter(List partitions) { return this; } + @Override + public FileStoreScan withPartitionsFilter(List> partitions) { + manifestsReader.withPartitionsFilter(partitions); + return this; + } + @Override public FileStoreScan withPartitionFilter(PartitionPredicate predicate) { manifestsReader.withPartitionFilter(predicate); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 179d16de6cd2..99ae3ef47d09 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -53,6 +53,8 @@ public interface FileStoreScan { FileStoreScan withPartitionFilter(List partitions); + FileStoreScan withPartitionsFilter(List> partitions); + FileStoreScan withPartitionFilter(PartitionPredicate predicate); FileStoreScan withBucket(int bucket); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java index 5ee468af20ea..2eaa3646f759 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java @@ -34,13 +34,17 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; +import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions; + /** A util class to read manifest files. */ @ThreadSafe public class ManifestsReader { private final RowType partitionType; + private final String partitionDefaultValue; private final SnapshotManager snapshotManager; private final ManifestList.Factory manifestListFactory; @@ -48,9 +52,11 @@ public class ManifestsReader { public ManifestsReader( RowType partitionType, + String partitionDefaultValue, SnapshotManager snapshotManager, ManifestList.Factory manifestListFactory) { this.partitionType = partitionType; + this.partitionDefaultValue = partitionDefaultValue; this.snapshotManager = snapshotManager; this.manifestListFactory = manifestListFactory; } @@ -65,6 +71,11 @@ public ManifestsReader withPartitionFilter(List partitions) { return this; } + public ManifestsReader withPartitionsFilter(List> partitions) { + return withPartitionFilter( + createBinaryPartitions(partitions, partitionType, partitionDefaultValue)); + } + public ManifestsReader withPartitionFilter(PartitionPredicate predicate) { this.partitionFilter = predicate; return this; diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java index 35822471a2d6..6be09fa9b992 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java @@ -25,6 +25,7 @@ import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; @@ -143,10 +144,31 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } @Override - public void dropPartition(Identifier identifier, Map partitions) - throws TableNotExistException, PartitionNotExistException { + public void createPartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + privilegeManager.getPrivilegeChecker().assertCanInsert(identifier); + wrapped.createPartitions(identifier, partitions); + } + + @Override + public void dropPartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + privilegeManager.getPrivilegeChecker().assertCanInsert(identifier); + wrapped.dropPartitions(identifier, partitions); + } + + @Override + public void alterPartitions(Identifier identifier, List partitions) + throws TableNotExistException { + privilegeManager.getPrivilegeChecker().assertCanInsert(identifier); + wrapped.alterPartitions(identifier, partitions); + } + + @Override + public void markDonePartitions(Identifier identifier, List> partitions) + throws TableNotExistException { privilegeManager.getPrivilegeChecker().assertCanInsert(identifier); - wrapped.dropPartition(identifier, partitions); + wrapped.markDonePartitions(identifier, partitions); } public void createPrivilegedUser(String user, String password) { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index c547656e7cb7..8659fbf655bb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -41,9 +41,7 @@ import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterTableRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; -import org.apache.paimon.rest.requests.CreatePartitionRequest; import org.apache.paimon.rest.requests.CreateTableRequest; -import org.apache.paimon.rest.requests.DropPartitionRequest; import org.apache.paimon.rest.requests.RenameTableRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.ConfigResponse; @@ -53,7 +51,6 @@ import org.apache.paimon.rest.responses.ListDatabasesResponse; import org.apache.paimon.rest.responses.ListPartitionsResponse; import org.apache.paimon.rest.responses.ListTablesResponse; -import org.apache.paimon.rest.responses.PartitionResponse; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.TableSchema; @@ -62,7 +59,6 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.Table; import org.apache.paimon.table.object.ObjectTable; -import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; @@ -84,7 +80,6 @@ import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; -import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; @@ -362,56 +357,27 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists) } @Override - public void createPartition(Identifier identifier, Map partitionSpec) + public void createPartitions(Identifier identifier, List> partitions) throws TableNotExistException { - Table table = getTable(identifier); - Options options = Options.fromMap(table.options()); - if (!options.get(METASTORE_PARTITIONED_TABLE)) { - return; - } - - try { - CreatePartitionRequest request = new CreatePartitionRequest(identifier, partitionSpec); - client.post( - resourcePaths.partitions( - identifier.getDatabaseName(), identifier.getTableName()), - request, - PartitionResponse.class, - headers()); - } catch (NoSuchResourceException e) { - throw new TableNotExistException(identifier); - } catch (ForbiddenException e) { - throw new TableNoPermissionException(identifier, e); - } + throw new UnsupportedOperationException(); } @Override - public void dropPartition(Identifier identifier, Map partition) - throws TableNotExistException, PartitionNotExistException { - checkNotSystemTable(identifier, "dropPartition"); + public void dropPartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + throw new UnsupportedOperationException(); + } - Table table = getTable(identifier); - Options options = Options.fromMap(table.options()); - if (options.get(METASTORE_PARTITIONED_TABLE)) { - try { - client.delete( - resourcePaths.partitions( - identifier.getDatabaseName(), identifier.getTableName()), - new DropPartitionRequest(partition), - headers()); - } catch (NoSuchResourceException ignore) { - throw new PartitionNotExistException(identifier, partition); - } catch (ForbiddenException e) { - throw new TableNoPermissionException(identifier, e); - } - } + @Override + public void alterPartitions(Identifier identifier, List partitions) + throws TableNotExistException { + throw new UnsupportedOperationException(); + } - try (BatchTableCommit commit = - table.newBatchWriteBuilder().withOverwrite(partition).newCommit()) { - commit.commit(Collections.emptyList()); - } catch (Exception e) { - throw new RuntimeException(e); - } + @Override + public void markDonePartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + throw new UnsupportedOperationException(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index e3e290f06086..eb405a5d5d3c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.table.source.TableScan; @@ -258,6 +259,13 @@ public Scan withPartitionFilter(List partitions) { return this; } + @Override + public InnerTableScan withPartitionsFilter(List> partitions) { + mainScan.withPartitionsFilter(partitions); + fallbackScan.withPartitionsFilter(partitions); + return this; + } + @Override public Scan withBucketFilter(Filter bucketFilter) { mainScan.withBucketFilter(bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index a5810bfc24b2..59b11281cc78 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -101,6 +101,12 @@ public AbstractDataTableScan withPartitionFilter(List partitions) { return this; } + @Override + public AbstractDataTableScan withPartitionsFilter(List> partitions) { + snapshotReader.withPartitionsFilter(partitions); + return this; + } + @Override public AbstractDataTableScan withLevelFilter(Filter levelFilter) { snapshotReader.withLevelFilter(levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index c2425ff16f97..f7d609187da5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -39,6 +39,10 @@ default InnerTableScan withPartitionFilter(Map partitionSpec) { return this; } + default InnerTableScan withPartitionsFilter(List> partitions) { + return this; + } + default InnerTableScan withPartitionFilter(List partitions) { return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index f3e0a92b8fc7..3329ab95fcc9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -73,6 +73,8 @@ public interface SnapshotReader { SnapshotReader withPartitionFilter(List partitions); + SnapshotReader withPartitionsFilter(List> partitions); + SnapshotReader withMode(ScanMode scanMode); SnapshotReader withLevelFilter(Filter levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 43a6d3c8721c..032738c38c95 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -192,6 +192,12 @@ public SnapshotReader withPartitionFilter(List partitions) { return this; } + @Override + public SnapshotReader withPartitionsFilter(List> partitions) { + scan.withPartitionsFilter(partitions); + return this; + } + @Override public SnapshotReader withFilter(Predicate predicate) { List partitionKeys = tableSchema.partitionKeys(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 1cb967f8d1e2..005535094ef5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -313,6 +313,12 @@ public SnapshotReader withPartitionFilter(List partitions) { return this; } + @Override + public SnapshotReader withPartitionsFilter(List> partitions) { + wrapped.withPartitionsFilter(partitions); + return this; + } + @Override public SnapshotReader withMode(ScanMode scanMode) { wrapped.withMode(scanMode); @@ -446,6 +452,12 @@ public InnerTableScan withPartitionFilter(List partitions) { return this; } + @Override + public InnerTableScan withPartitionsFilter(List> partitions) { + batchScan.withPartitionsFilter(partitions); + return this; + } + @Override public InnerTableScan withBucketFilter(Filter bucketFilter) { batchScan.withBucketFilter(bucketFilter); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 627b02c1e3b7..344807b4c96e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -50,9 +50,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; @@ -317,106 +315,6 @@ public void testDropTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() throws E () -> restCatalog.dropTable(Identifier.create(databaseName, tableName), false)); } - @Test - public void testCreatePartition() throws Exception { - String databaseName = MockRESTMessage.databaseName(); - GetTableResponse response = MockRESTMessage.getTableResponse(); - mockResponse(mapper.writeValueAsString(response), 200); - - Map partitionSpec = new HashMap<>(); - partitionSpec.put("p1", "v1"); - mockResponse(mapper.writeValueAsString(MockRESTMessage.partitionResponse()), 200); - assertDoesNotThrow( - () -> - restCatalog.createPartition( - Identifier.create(databaseName, "table"), partitionSpec)); - } - - @Test - public void testCreatePartitionWhenTableNotExist() throws Exception { - String databaseName = MockRESTMessage.databaseName(); - Map partitionSpec = new HashMap<>(); - partitionSpec.put("p1", "v1"); - mockResponse("", 404); - assertThrows( - Catalog.TableNotExistException.class, - () -> - restCatalog.createPartition( - Identifier.create(databaseName, "table"), partitionSpec)); - } - - @Test - public void testCreatePartitionWhenTableNoPermissionException() throws Exception { - String databaseName = MockRESTMessage.databaseName(); - Map partitionSpec = new HashMap<>(); - partitionSpec.put("p1", "v1"); - mockResponse("", 403); - assertThrows( - Catalog.TableNoPermissionException.class, - () -> - restCatalog.createPartition( - Identifier.create(databaseName, "table"), partitionSpec)); - } - - @Test - public void testDropPartition() throws Exception { - String databaseName = MockRESTMessage.databaseName(); - Map partitionSpec = new HashMap<>(); - GetTableResponse response = MockRESTMessage.getTableResponse(); - partitionSpec.put(response.getSchema().primaryKeys().get(0), "1"); - mockResponse(mapper.writeValueAsString(""), 200); - mockResponse(mapper.writeValueAsString(response), 200); - assertThrows( - RuntimeException.class, - () -> - restCatalog.dropPartition( - Identifier.create(databaseName, "table"), partitionSpec)); - } - - @Test - public void testDropPartitionWhenPartitionNoExist() throws Exception { - String databaseName = MockRESTMessage.databaseName(); - GetTableResponse response = MockRESTMessage.getTableResponseEnablePartition(); - mockResponse(mapper.writeValueAsString(response), 200); - - Map partitionSpec = new HashMap<>(); - partitionSpec.put(response.getSchema().primaryKeys().get(0), "1"); - mockResponse(mapper.writeValueAsString(""), 404); - assertThrows( - Catalog.PartitionNotExistException.class, - () -> - restCatalog.dropPartition( - Identifier.create(databaseName, "table"), partitionSpec)); - } - - @Test - public void testDropPartitionWhenTableNoPermission() throws Exception { - String databaseName = MockRESTMessage.databaseName(); - Map partitionSpec = new HashMap<>(); - GetTableResponse response = MockRESTMessage.getTableResponse(); - partitionSpec.put(response.getSchema().primaryKeys().get(0), "1"); - mockResponse(mapper.writeValueAsString(""), 403); - assertThrows( - Catalog.TableNoPermissionException.class, - () -> - restCatalog.dropPartition( - Identifier.create(databaseName, "table"), partitionSpec)); - } - - @Test - public void testDropPartitionWhenTableNoExist() throws Exception { - String databaseName = MockRESTMessage.databaseName(); - Map partitionSpec = new HashMap<>(); - GetTableResponse response = MockRESTMessage.getTableResponse(); - partitionSpec.put(response.getSchema().primaryKeys().get(0), "1"); - mockResponse("", 404); - assertThrows( - Catalog.TableNotExistException.class, - () -> - restCatalog.dropPartition( - Identifier.create(databaseName, "table"), partitionSpec)); - } - @Test public void testListPartitionsWhenMetastorePartitionedIsTrue() throws Exception { String databaseName = MockRESTMessage.databaseName(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index ec3c4a47a69d..7d19db31770a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -1421,7 +1421,8 @@ public final void createPartition( try { Identifier identifier = toIdentifier(tablePath); - catalog.createPartition(identifier, partitionSpec.getPartitionSpec()); + catalog.createPartitions( + identifier, Collections.singletonList(partitionSpec.getPartitionSpec())); } catch (Catalog.TableNotExistException e) { throw new CatalogException(e); } @@ -1440,11 +1441,10 @@ public final void dropPartition( try { Identifier identifier = toIdentifier(tablePath); - catalog.dropPartition(identifier, partitionSpec.getPartitionSpec()); + catalog.dropPartitions( + identifier, Collections.singletonList(partitionSpec.getPartitionSpec())); } catch (Catalog.TableNotExistException e) { throw new CatalogException(e); - } catch (Catalog.PartitionNotExistException e) { - throw new PartitionNotExistException(getName(), tablePath, partitionSpec); } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index a213909beb10..5afb60e84f82 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -50,6 +50,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.PartitionPathUtils; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.view.View; import org.apache.paimon.view.ViewImpl; @@ -66,6 +67,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -105,6 +107,7 @@ import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; +import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; @@ -145,6 +148,8 @@ public class HiveCatalog extends AbstractCatalog { public static final String HIVE_SITE_FILE = "hive-site.xml"; private static final String HIVE_EXTERNAL_TABLE_PROP = "EXTERNAL"; private static final int DEFAULT_TABLE_BATCH_SIZE = 300; + private static final String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime"; + private final HiveConf hiveConf; private final String clientClassName; private final Options options; @@ -344,40 +349,177 @@ public org.apache.paimon.catalog.Database getDatabaseImpl(String name) } } + private boolean metastorePartitioned(TableSchema schema) { + CoreOptions options = CoreOptions.fromMap(schema.options()); + return (!schema.partitionKeys().isEmpty() && options.partitionedTableInMetastore()) + || options.tagToPartitionField() != null; + } + + @Override + public void createPartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + Identifier tableIdentifier = + Identifier.create(identifier.getDatabaseName(), identifier.getTableName()); + Table hmsTable = getHmsTable(tableIdentifier); + Path location = getTableLocation(tableIdentifier, hmsTable); + TableSchema schema = getDataTableSchema(tableIdentifier, hmsTable); + + if (!metastorePartitioned(schema)) { + return; + } + + int currentTime = (int) (System.currentTimeMillis() / 1000); + + try { + List hivePartitions = + toHivePartitions( + identifier, + location.toString(), + hmsTable.getSd(), + partitions, + currentTime); + clients.execute(client -> client.add_partitions(hivePartitions, true, false)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void dropPartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + TableSchema schema = getDataTableSchema(identifier); + CoreOptions options = CoreOptions.fromMap(schema.options()); + boolean tagToPart = options.tagToPartitionField() != null; + if (metastorePartitioned(schema)) { + List> metaPartitions = + tagToPart + ? partitions + : removePartitionsExistsInOtherBranches(identifier, partitions); + for (Map part : metaPartitions) { + List partitionValues = new ArrayList<>(part.values()); + try { + clients.execute( + client -> + client.dropPartition( + identifier.getDatabaseName(), + identifier.getTableName(), + partitionValues, + false)); + } catch (NoSuchObjectException e) { + // do nothing if the partition not exists + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + if (!tagToPart) { + super.dropPartitions(identifier, partitions); + } + } + @Override - public void dropPartition(Identifier identifier, Map partitionSpec) + public void alterPartitions( + Identifier identifier, List partitions) throws TableNotExistException { TableSchema tableSchema = getDataTableSchema(identifier); if (!tableSchema.partitionKeys().isEmpty() - && new CoreOptions(tableSchema.options()).partitionedTableInMetastore() - && !partitionExistsInOtherBranches(identifier, partitionSpec)) { + && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { + for (org.apache.paimon.partition.Partition partition : partitions) { + Map spec = partition.spec(); + List partitionValues = + tableSchema.partitionKeys().stream() + .map(spec::get) + .collect(Collectors.toList()); + + Map statistic = new HashMap<>(); + statistic.put(NUM_FILES_PROP, String.valueOf(partition.fileCount())); + statistic.put(TOTAL_SIZE_PROP, String.valueOf(partition.fileSizeInBytes())); + statistic.put(NUM_ROWS_PROP, String.valueOf(partition.recordCount())); + + String modifyTimeSeconds = String.valueOf(partition.lastFileCreationTime() / 1000); + statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds); + + // just for being compatible with hive metastore + statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds); + + try { + Partition hivePartition = + clients.run( + client -> + client.getPartition( + identifier.getDatabaseName(), + identifier.getObjectName(), + partitionValues)); + hivePartition.setValues(partitionValues); + hivePartition.setLastAccessTime( + (int) (partition.lastFileCreationTime() / 1000)); + hivePartition.getParameters().putAll(statistic); + clients.execute( + client -> + client.alter_partition( + identifier.getDatabaseName(), + identifier.getObjectName(), + hivePartition)); + } catch (NoSuchObjectException e) { + // do nothing if the partition not exists + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + } + + @Override + public List listPartitions(Identifier identifier) + throws TableNotExistException { + FileStoreTable table = (FileStoreTable) getTable(identifier); + String tagToPartitionField = table.coreOptions().tagToPartitionField(); + if (tagToPartitionField != null) { try { - // Do not close client, it is for HiveCatalog - @SuppressWarnings("resource") - HiveMetastoreClient metastoreClient = - new HiveMetastoreClient( - new Identifier( - identifier.getDatabaseName(), identifier.getTableName()), - clients); - metastoreClient.dropPartition(new LinkedHashMap<>(partitionSpec)); + List partitions = + clients.run( + client -> + client.listPartitions( + identifier.getDatabaseName(), + identifier.getTableName(), + Short.MAX_VALUE)); + return partitions.stream() + .map( + part -> + new org.apache.paimon.partition.Partition( + Collections.singletonMap( + tagToPartitionField, + part.getValues().get(0)), + 1L, + 1L, + 1L, + System.currentTimeMillis())) + .collect(Collectors.toList()); } catch (Exception e) { throw new RuntimeException(e); } } - super.dropPartition(identifier, partitionSpec); + return listPartitionsFromFileSystem(table); } - private boolean partitionExistsInOtherBranches( - Identifier identifier, Map partitionSpec) - throws TableNotExistException { + private List> removePartitionsExistsInOtherBranches( + Identifier identifier, List> inputs) throws TableNotExistException { FileStoreTable mainTable = (FileStoreTable) getTable( new Identifier( identifier.getDatabaseName(), identifier.getTableName())); + + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + mainTable.coreOptions().partitionDefaultName(), + mainTable.rowType().project(mainTable.partitionKeys()), + mainTable.partitionKeys().toArray(new String[0]), + mainTable.coreOptions().legacyPartitionName()); List branchNames = new ArrayList<>(mainTable.branchManager().branches()); branchNames.add(DEFAULT_MAIN_BRANCH); + Set> inputsToRemove = new HashSet<>(inputs); for (String branchName : branchNames) { if (branchName.equals(identifier.getBranchNameOrDefault())) { continue; @@ -389,12 +531,13 @@ private boolean partitionExistsInOtherBranches( continue; } - FileStoreTable table = mainTable.switchToBranch(branchName); - if (!table.newScan().withPartitionFilter(partitionSpec).listPartitions().isEmpty()) { - return true; - } + mainTable.switchToBranch(branchName).newScan() + .withPartitionsFilter(new ArrayList<>(inputsToRemove)).listPartitions().stream() + .map(partitionComputer::generatePartValues) + .forEach(inputsToRemove::remove); } - return false; + + return new ArrayList<>(inputsToRemove); } @Override @@ -1464,4 +1607,30 @@ public int getBatchGetTableSize() { return DEFAULT_TABLE_BATCH_SIZE; } } + + private List toHivePartitions( + Identifier identifier, + String tablePath, + StorageDescriptor sd, + List> partitions, + int currentTime) { + List hivePartitions = new ArrayList<>(); + for (Map partitionSpec : partitions) { + Partition hivePartition = new Partition(); + StorageDescriptor newSd = new StorageDescriptor(sd); + newSd.setLocation( + tablePath + + "/" + + PartitionPathUtils.generatePartitionPath( + new LinkedHashMap<>(partitionSpec))); + hivePartition.setDbName(identifier.getDatabaseName()); + hivePartition.setTableName(identifier.getTableName()); + hivePartition.setValues(new ArrayList<>(partitionSpec.values())); + hivePartition.setSd(newSd); + hivePartition.setCreateTime(currentTime); + hivePartition.setLastAccessTime(currentTime); + hivePartitions.add(hivePartition); + } + return hivePartitions; + } } diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index e733ec16c839..d96fac808cab 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.client.ClientPool; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.types.DataField; @@ -55,6 +56,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY; +import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; +import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION; import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_IDENTIFIER; import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP; import static org.assertj.core.api.Assertions.assertThat; @@ -448,4 +451,56 @@ public void testCreateExternalTableWithLocation(@TempDir java.nio.file.Path temp externalWarehouseCatalog.close(); } + + @Test + public void testTagToPartitionTable() throws Exception { + String databaseName = "testTagToPartitionTable"; + catalog.dropDatabase(databaseName, true, true); + catalog.createDatabase(databaseName, true); + Identifier identifier = Identifier.create(databaseName, "table"); + catalog.createTable( + identifier, + Schema.newBuilder() + .option(METASTORE_TAG_TO_PARTITION.key(), "dt") + .column("col", DataTypes.INT()) + .column("dt", DataTypes.STRING()) + .build(), + true); + + catalog.createPartitions( + identifier, + Arrays.asList( + Collections.singletonMap("dt", "20250101"), + Collections.singletonMap("dt", "20250102"))); + assertThat(catalog.listPartitions(identifier).stream().map(Partition::spec)) + .containsExactlyInAnyOrder( + Collections.singletonMap("dt", "20250102"), + Collections.singletonMap("dt", "20250101")); + } + + @Test + public void testPartitionTable() throws Exception { + String databaseName = "testPartitionTable"; + catalog.dropDatabase(databaseName, true, true); + catalog.createDatabase(databaseName, true); + Identifier identifier = Identifier.create(databaseName, "table"); + catalog.createTable( + identifier, + Schema.newBuilder() + .option(METASTORE_PARTITIONED_TABLE.key(), "true") + .column("col", DataTypes.INT()) + .column("dt", DataTypes.STRING()) + .partitionKeys("dt") + .build(), + true); + + catalog.createPartitions( + identifier, + Arrays.asList( + Collections.singletonMap("dt", "20250101"), + Collections.singletonMap("dt", "20250102"))); + + // hive catalog list partitions from filesystem, so here return empty. + assertThat(catalog.listPartitions(identifier)).isEmpty(); + } }