diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 02e662350ffd..439f456efb4c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -24,11 +24,11 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; @@ -65,6 +65,7 @@ import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; +import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS; @@ -200,9 +201,8 @@ public void dropPartition(Identifier identifier, Map partitionSp } @Override - public List listPartitions(Identifier identifier) - throws TableNotExistException { - return getTable(identifier).newReadBuilder().newScan().listPartitionEntries(); + public List listPartitions(Identifier identifier) throws TableNotExistException { + return listPartitionsFromFileSystem(getTable(identifier)); } protected abstract void createDatabaseImpl(String name, Map properties); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 34e53f32f267..4796276972b9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -19,9 +19,9 @@ package org.apache.paimon.catalog; import org.apache.paimon.fs.Path; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -61,7 +61,7 @@ public class CachingCatalog extends DelegateCatalog { @Nullable protected final SegmentsCache manifestCache; // partition cache will affect data latency - @Nullable protected final Cache> partitionCache; + @Nullable protected final Cache> partitionCache; public CachingCatalog(Catalog wrapped) { this( @@ -130,7 +130,7 @@ public CachingCatalog( .executor(Runnable::run) .expireAfterAccess(expirationInterval) .weigher( - (Weigher>) + (Weigher>) (identifier, v) -> v.size()) .maximumWeight(cachedPartitionMaxNum) .ticker(ticker) @@ -281,13 +281,12 @@ private void putTableCache(Identifier identifier, Table table) { } @Override - public List listPartitions(Identifier identifier) - throws TableNotExistException { + public List listPartitions(Identifier identifier) throws TableNotExistException { if (partitionCache == null) { return wrapped.listPartitions(identifier); } - List result = partitionCache.getIfPresent(identifier); + List result = partitionCache.getIfPresent(identifier); if (result == null) { result = wrapped.listPartitions(identifier); partitionCache.put(identifier, result); @@ -321,7 +320,7 @@ public void invalidateTable(Identifier identifier) { */ public void refreshPartitions(Identifier identifier) throws TableNotExistException { if (partitionCache != null) { - List result = wrapped.listPartitions(identifier); + List result = wrapped.listPartitions(identifier); partitionCache.put(identifier, result); } } @@ -341,8 +340,7 @@ public CacheSizes estimatedCacheSizes() { } long partitionCacheSize = 0L; if (partitionCache != null) { - for (Map.Entry> entry : - partitionCache.asMap().entrySet()) { + for (Map.Entry> entry : partitionCache.asMap().entrySet()) { partitionCacheSize += entry.getValue().size(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 0e1482c87b80..e90d3c79c51c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -20,7 +20,7 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; @@ -255,12 +255,12 @@ void dropPartition(Identifier identifier, Map partition) throws TableNotExistException, PartitionNotExistException; /** - * Get PartitionEntry of all partitions of the table. + * Get Partition of all partitions of the table. * * @param identifier path of the table to list partitions * @throws TableNotExistException if the table does not exist */ - List listPartitions(Identifier identifier) throws TableNotExistException; + List listPartitions(Identifier identifier) throws TableNotExistException; /** * Modify an existing table from a {@link SchemaChange}. diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index d454547e31af..cddb76e6835b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -19,14 +19,22 @@ package org.apache.paimon.catalog; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.system.SystemTableLoader; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Preconditions; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; +import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME; import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; @@ -117,4 +125,27 @@ public static Table createSystemTable(Identifier identifier, Table originTable) } return table; } + + public static List listPartitionsFromFileSystem(Table table) { + Options options = Options.fromMap(table.options()); + InternalRowPartitionComputer computer = + new InternalRowPartitionComputer( + options.get(PARTITION_DEFAULT_NAME), + table.rowType(), + table.partitionKeys().toArray(new String[0]), + options.get(PARTITION_GENERATE_LEGCY_NAME)); + List partitionEntries = + table.newReadBuilder().newScan().listPartitionEntries(); + List partitions = new ArrayList<>(partitionEntries.size()); + for (PartitionEntry entry : partitionEntries) { + partitions.add( + new Partition( + computer.generatePartValues(entry.partition()), + entry.recordCount(), + entry.fileSizeInBytes(), + entry.fileCount(), + entry.lastFileCreationTime())); + } + return partitions; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 968f00cfcae5..e2d1a94cfaff 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -19,7 +19,7 @@ package org.apache.paimon.catalog; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; @@ -165,8 +165,7 @@ public void dropPartition(Identifier identifier, Map partitions) } @Override - public List listPartitions(Identifier identifier) - throws TableNotExistException { + public List listPartitions(Identifier identifier) throws TableNotExistException { return wrapped.listPartitions(identifier); } diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java index ccf5f3853873..f24049eca9bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java @@ -18,6 +18,8 @@ package org.apache.paimon.metastore; +import org.apache.paimon.partition.Partition; + import java.io.Serializable; import java.util.LinkedHashMap; import java.util.List; @@ -38,9 +40,7 @@ public interface MetastoreClient extends AutoCloseable { void markPartitionDone(LinkedHashMap partition) throws Exception; - default void alterPartition( - LinkedHashMap partition, PartitionStats partitionStats) - throws Exception { + default void alterPartition(Partition partition) throws Exception { throw new UnsupportedOperationException(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java b/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java deleted file mode 100644 index eacc400f52c3..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/PartitionStats.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.metastore; - -/** Statistic for partition. */ -public interface PartitionStats { - - long numFiles(); - - long totalSize(); - - long numRows(); - - long lastUpdateTimeMillis(); - - static PartitionStats create( - long numFiles, long totalSize, long numRows, long lastUpdateTimeMillis) { - return new PartitionStats() { - - @Override - public long numFiles() { - return numFiles; - } - - @Override - public long totalSize() { - return totalSize; - } - - @Override - public long numRows() { - return numRows; - } - - @Override - public long lastUpdateTimeMillis() { - return lastUpdateTimeMillis; - } - - @Override - public String toString() { - return String.format( - "numFiles: %s, totalSize: %s, numRows: %s, lastUpdateTimeMillis: %s", - numFiles, totalSize, numRows, lastUpdateTimeMillis); - } - }; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java b/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java new file mode 100644 index 000000000000..b13082fb4430 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/partition/Partition.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.partition; + +import org.apache.paimon.annotation.Public; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; + +/** Entry representing a partition. */ +@JsonIgnoreProperties(ignoreUnknown = true) +@Public +public class Partition implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final String FIELD_SPEC = "spec"; + public static final String FIELD_RECORD_COUNT = "recordCount"; + public static final String FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes"; + public static final String FIELD_FILE_COUNT = "fileCount"; + public static final String FIELD_LAST_FILE_CREATION_TIME = "lastFileCreationTime"; + + @JsonProperty(FIELD_SPEC) + private final Map spec; + + @JsonProperty(FIELD_RECORD_COUNT) + private final long recordCount; + + @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) + private final long fileSizeInBytes; + + @JsonProperty(FIELD_FILE_COUNT) + private final long fileCount; + + @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) + private final long lastFileCreationTime; + + @JsonCreator + public Partition( + @JsonProperty(FIELD_SPEC) Map spec, + @JsonProperty(FIELD_RECORD_COUNT) long recordCount, + @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes, + @JsonProperty(FIELD_FILE_COUNT) long fileCount, + @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long lastFileCreationTime) { + this.spec = spec; + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.fileCount = fileCount; + this.lastFileCreationTime = lastFileCreationTime; + } + + @JsonGetter(FIELD_SPEC) + public Map spec() { + return spec; + } + + @JsonGetter(FIELD_RECORD_COUNT) + public long recordCount() { + return recordCount; + } + + @JsonGetter(FIELD_FILE_SIZE_IN_BYTES) + public long fileSizeInBytes() { + return fileSizeInBytes; + } + + @JsonGetter(FIELD_FILE_COUNT) + public long fileCount() { + return fileCount; + } + + @JsonGetter(FIELD_LAST_FILE_CREATION_TIME) + public long lastFileCreationTime() { + return lastFileCreationTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Partition that = (Partition) o; + return recordCount == that.recordCount + && fileSizeInBytes == that.fileSizeInBytes + && fileCount == that.fileCount + && lastFileCreationTime == that.lastFileCreationTime + && Objects.equals(spec, that.spec); + } + + @Override + public int hashCode() { + return Objects.hash(spec, recordCount, fileSizeInBytes, fileCount, lastFileCreationTime); + } + + @Override + public String toString() { + return "{" + + "spec=" + + spec + + ", recordCount=" + + recordCount + + ", fileSizeInBytes=" + + fileSizeInBytes + + ", fileCount=" + + fileCount + + ", lastFileCreationTime=" + + lastFileCreationTime + + '}'; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 152907db4ce8..c547656e7cb7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -26,14 +26,12 @@ import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.auth.AuthSession; import org.apache.paimon.rest.auth.CredentialsProvider; import org.apache.paimon.rest.auth.CredentialsProviderFactory; @@ -65,7 +63,6 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.object.ObjectTable; import org.apache.paimon.table.sink.BatchTableCommit; -import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; @@ -86,12 +83,11 @@ import java.util.concurrent.ScheduledExecutorService; import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; -import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; +import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; -import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow; import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; @@ -419,12 +415,11 @@ public void dropPartition(Identifier identifier, Map partition) } @Override - public List listPartitions(Identifier identifier) - throws TableNotExistException { + public List listPartitions(Identifier identifier) throws TableNotExistException { Table table = getTable(identifier); Options options = Options.fromMap(table.options()); if (!options.get(METASTORE_PARTITIONED_TABLE)) { - return table.newReadBuilder().newScan().listPartitionEntries(); + return listPartitionsFromFileSystem(table); } ListPartitionsResponse response; @@ -445,22 +440,7 @@ public List listPartitions(Identifier identifier) return Collections.emptyList(); } - RowType partitionType = table.rowType().project(table.partitionKeys()); - InternalRowSerializer serializer = new InternalRowSerializer(partitionType); - String defaultName = options.get(PARTITION_DEFAULT_NAME); - List result = new ArrayList<>(); - for (PartitionResponse partition : response.getPartitions()) { - GenericRow row = - convertSpecToInternalRow(partition.getSpec(), partitionType, defaultName); - result.add( - new PartitionEntry( - serializer.toBinaryRow(row).copy(), - partition.getRecordCount(), - partition.getFileSizeInBytes(), - partition.getFileCount(), - partition.getLastFileCreationTime())); - } - return result; + return response.getPartitions(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java index 1f194d208e99..9a3ea8d4ee04 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java @@ -18,6 +18,7 @@ package org.apache.paimon.rest.responses; +import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.RESTResponse; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -34,16 +35,15 @@ public class ListPartitionsResponse implements RESTResponse { public static final String FIELD_PARTITIONS = "partitions"; @JsonProperty(FIELD_PARTITIONS) - private final List partitions; + private final List partitions; @JsonCreator - public ListPartitionsResponse( - @JsonProperty(FIELD_PARTITIONS) List partitions) { + public ListPartitionsResponse(@JsonProperty(FIELD_PARTITIONS) List partitions) { this.partitions = partitions; } @JsonGetter(FIELD_PARTITIONS) - public List getPartitions() { + public List getPartitions() { return partitions; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/responses/PartitionResponse.java b/paimon-core/src/main/java/org/apache/paimon/rest/responses/PartitionResponse.java index 2706b5d7daf9..f4486b9260d0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/responses/PartitionResponse.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/responses/PartitionResponse.java @@ -18,6 +18,7 @@ package org.apache.paimon.rest.responses; +import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.RESTResponse; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -25,69 +26,22 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.util.Map; - /** Partition for rest api. */ @JsonIgnoreProperties(ignoreUnknown = true) public class PartitionResponse implements RESTResponse { - public static final String FIELD_SPEC = "spec"; - public static final String FIELD_RECORD_COUNT = "recordCount"; - public static final String FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes"; - public static final String FIELD_FILE_COUNT = "fileCount"; - public static final String FIELD_LAST_FILE_CREATION_TIME = "lastFileCreationTime"; - - @JsonProperty(FIELD_SPEC) - private final Map spec; - - @JsonProperty(FIELD_RECORD_COUNT) - private final long recordCount; - - @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) - private final long fileSizeInBytes; + public static final String FIELD_PARTITION = "partition"; - @JsonProperty(FIELD_FILE_COUNT) - private final long fileCount; - - @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) - private final long lastFileCreationTime; + @JsonProperty(FIELD_PARTITION) + private final Partition partition; @JsonCreator - public PartitionResponse( - @JsonProperty(FIELD_SPEC) Map spec, - @JsonProperty(FIELD_RECORD_COUNT) long recordCount, - @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes, - @JsonProperty(FIELD_FILE_COUNT) long fileCount, - @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long lastFileCreationTime) { - this.spec = spec; - this.recordCount = recordCount; - this.fileSizeInBytes = fileSizeInBytes; - this.fileCount = fileCount; - this.lastFileCreationTime = lastFileCreationTime; - } - - @JsonGetter(FIELD_SPEC) - public Map getSpec() { - return spec; - } - - @JsonGetter(FIELD_RECORD_COUNT) - public long getRecordCount() { - return recordCount; - } - - @JsonGetter(FIELD_FILE_SIZE_IN_BYTES) - public long getFileSizeInBytes() { - return fileSizeInBytes; - } - - @JsonGetter(FIELD_FILE_COUNT) - public long getFileCount() { - return fileCount; + public PartitionResponse(@JsonProperty(FIELD_PARTITION) Partition partition) { + this.partition = partition; } - @JsonGetter(FIELD_LAST_FILE_CREATION_TIME) - public long getLastFileCreationTime() { - return lastFileCreationTime; + @JsonGetter(FIELD_PARTITION) + public Partition getPartition() { + return partition; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index fee6d1433143..c028fa7421d5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -21,9 +21,9 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; @@ -245,12 +245,12 @@ public void testPartitionCache() throws Exception { Collections.emptyMap(), ""); catalog.createTable(tableIdent, schema, false); - List partitionEntryList = catalog.listPartitions(tableIdent); + List partitionEntryList = catalog.listPartitions(tableIdent); assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent); catalog.invalidateTable(tableIdent); catalog.refreshPartitions(tableIdent); assertThat(catalog.partitionCache().asMap()).containsKey(tableIdent); - List partitionEntryListFromCache = + List partitionEntryListFromCache = catalog.partitionCache().getIfPresent(tableIdent); assertThat(partitionEntryListFromCache).isNotNull(); assertThat(partitionEntryListFromCache).containsAll(partitionEntryList); diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java index 1d4a9b0e8a58..0eaf23a1a28d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java @@ -18,8 +18,8 @@ package org.apache.paimon.catalog; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.partition.Partition; import org.apache.paimon.table.Table; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; @@ -56,7 +56,7 @@ public Cache tableCache() { return tableCache; } - public Cache> partitionCache() { + public Cache> partitionCache() { partitionCache.cleanUp(); return partitionCache; } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java index 9b686b683773..4b228d93c6f7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterTableRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; @@ -149,11 +150,11 @@ public static DropPartitionRequest dropPartitionRequest() { public static PartitionResponse partitionResponse() { Map spec = new HashMap<>(); spec.put("f0", "1"); - return new PartitionResponse(spec, 1, 1, 1, 1); + return new PartitionResponse(new Partition(spec, 1, 1, 1, 1)); } public static ListPartitionsResponse listPartitionsResponse() { - PartitionResponse partition = partitionResponse(); + Partition partition = partitionResponse().getPartition(); return new ListPartitionsResponse(ImmutableList.of(partition)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index c24bc0534c61..627b02c1e3b7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -22,9 +22,9 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.requests.CreateTableRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; @@ -424,7 +424,7 @@ public void testListPartitionsWhenMetastorePartitionedIsTrue() throws Exception mockResponse(mapper.writeValueAsString(getTableResponse), 200); ListPartitionsResponse response = MockRESTMessage.listPartitionsResponse(); mockResponse(mapper.writeValueAsString(response), 200); - List result = + List result = restCatalog.listPartitions(Identifier.create(databaseName, "table")); assertEquals(response.getPartitions().size(), result.size()); } @@ -435,7 +435,7 @@ public void testListPartitionsFromFile() throws Exception { GetTableResponse response = MockRESTMessage.getTableResponseEnablePartition(); mockResponse(mapper.writeValueAsString(response), 200); mockResponse(mapper.writeValueAsString(response), 200); - List partitionEntries = + List partitionEntries = restCatalog.listPartitions(Identifier.create(databaseName, "table")); assertEquals(partitionEntries.size(), 0); } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java index 6712b7b991f3..38a6e08751f9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java @@ -225,8 +225,8 @@ public void listPartitionsResponseParseTest() throws Exception { ListPartitionsResponse parseData = mapper.readValue(responseStr, ListPartitionsResponse.class); assertEquals( - response.getPartitions().get(0).getFileCount(), - parseData.getPartitions().get(0).getFileCount()); + response.getPartitions().get(0).fileCount(), + parseData.getPartitions().get(0).fileCount()); } @Test diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java index ced37726f1eb..84542af4768b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java @@ -22,7 +22,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.metastore.MetastoreClient; -import org.apache.paimon.metastore.PartitionStats; +import org.apache.paimon.partition.Partition; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ScanMode; @@ -82,10 +82,10 @@ public void report(String partition, long modifyTimeMillis) throws Exception { } } - PartitionStats partitionStats = - PartitionStats.create(fileCount, totalSize, rowCount, modifyTimeMillis); + Partition partitionStats = + new Partition(partitionSpec, fileCount, totalSize, rowCount, modifyTimeMillis); LOG.info("alter partition {} with statistic {}.", partitionSpec, partitionStats); - metastoreClient.alterPartition(partitionSpec, partitionStats); + metastoreClient.alterPartition(partitionStats); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java index 3bdbdd20ad3e..3c5cd2f8e927 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.sink.partition; import org.apache.paimon.metastore.MetastoreClient; -import org.apache.paimon.metastore.PartitionStats; +import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.actions.AddDonePartitionAction; import org.junit.jupiter.api.Test; @@ -68,9 +68,7 @@ public void markPartitionDone(LinkedHashMap partitions) { } @Override - public void alterPartition( - LinkedHashMap partitionSpec, - PartitionStats partitionStats) { + public void alterPartition(Partition partition) { throw new UnsupportedOperationException(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java index 0f761efa2278..3c01772d6d3b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java @@ -23,7 +23,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.metastore.MetastoreClient; -import org.apache.paimon.metastore.PartitionStats; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; @@ -85,7 +85,7 @@ public void testReportAction() throws Exception { BatchTableCommit committer = table.newBatchWriteBuilder().newCommit(); committer.commit(messages); AtomicBoolean closed = new AtomicBoolean(false); - Map partitionParams = Maps.newHashMap(); + Map partitionParams = Maps.newHashMap(); MetastoreClient client = new MetastoreClient() { @@ -116,12 +116,12 @@ public void markPartitionDone(LinkedHashMap partitionSpec) { } @Override - public void alterPartition( - LinkedHashMap partitionSpec, - PartitionStats partitionStats) { + public void alterPartition(Partition partition) { partitionParams.put( - PartitionPathUtils.generatePartitionPath(partitionSpec), - partitionStats); + PartitionPathUtils.generatePartitionPath( + partition.spec(), + table.rowType().project(table.partitionKeys())), + partition); } @Override @@ -136,7 +136,7 @@ public void close() { Assertions.assertThat(partitionParams).containsKey("c1=a/"); Assertions.assertThat(partitionParams.get("c1=a/").toString()) .isEqualTo( - "numFiles: 1, totalSize: 591, numRows: 1, lastUpdateTimeMillis: 1729598544974"); + "{spec={c1=a}, recordCount=1, fileSizeInBytes=591, fileCount=1, lastFileCreationTime=1729598544974}"); action.close(); Assertions.assertThat(closed).isTrue(); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index 0661988648f4..755b2df2069f 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -23,13 +23,13 @@ import org.apache.paimon.client.ClientPool; import org.apache.paimon.hive.pool.CachedClientPool; import org.apache.paimon.metastore.MetastoreClient; -import org.apache.paimon.metastore.PartitionStats; import org.apache.paimon.options.Options; import org.apache.paimon.utils.PartitionPathUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; @@ -57,6 +57,7 @@ public class HiveMetastoreClient implements MetastoreClient { private final Identifier identifier; private final ClientPool clients; + private final List partitionKeys; private final StorageDescriptor sd; private final String dataFilePath; @@ -69,6 +70,10 @@ public class HiveMetastoreClient implements MetastoreClient { client -> client.getTable( identifier.getDatabaseName(), identifier.getTableName())); + this.partitionKeys = + table.getPartitionKeys().stream() + .map(FieldSchema::getName) + .collect(Collectors.toList()); this.sd = table.getSd(); this.dataFilePath = table.getParameters().containsKey(CoreOptions.DATA_FILE_PATH_DIRECTORY.key()) @@ -103,17 +108,17 @@ public void addPartitions(List> partitions) throws } @Override - public void alterPartition( - LinkedHashMap partition, PartitionStats partitionStats) - throws Exception { - List partitionValues = new ArrayList<>(partition.values()); + public void alterPartition(org.apache.paimon.partition.Partition partition) throws Exception { + Map spec = partition.spec(); + List partitionValues = + partitionKeys.stream().map(spec::get).collect(Collectors.toList()); Map statistic = new HashMap<>(); - statistic.put(NUM_FILES_PROP, String.valueOf(partitionStats.numFiles())); - statistic.put(TOTAL_SIZE_PROP, String.valueOf(partitionStats.totalSize())); - statistic.put(NUM_ROWS_PROP, String.valueOf(partitionStats.numRows())); + statistic.put(NUM_FILES_PROP, String.valueOf(partition.fileCount())); + statistic.put(TOTAL_SIZE_PROP, String.valueOf(partition.fileSizeInBytes())); + statistic.put(NUM_ROWS_PROP, String.valueOf(partition.recordCount())); - String modifyTimeSeconds = String.valueOf(partitionStats.lastUpdateTimeMillis() / 1000); + String modifyTimeSeconds = String.valueOf(partition.lastFileCreationTime() / 1000); statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds); // just for being compatible with hive metastore @@ -128,7 +133,7 @@ public void alterPartition( identifier.getObjectName(), partitionValues)); hivePartition.setValues(partitionValues); - hivePartition.setLastAccessTime((int) (partitionStats.lastUpdateTimeMillis() / 1000)); + hivePartition.setLastAccessTime((int) (partition.lastFileCreationTime() / 1000)); hivePartition.getParameters().putAll(statistic); clients.execute( client -> diff --git a/paimon-open-api/rest-catalog-open-api.yaml b/paimon-open-api/rest-catalog-open-api.yaml index 7a0c9663b4f2..9e4d13b44c35 100644 --- a/paimon-open-api/rest-catalog-open-api.yaml +++ b/paimon-open-api/rest-catalog-open-api.yaml @@ -880,8 +880,13 @@ components: partitions: type: array items: - $ref: '#/components/schemas/PartitionResponse' + $ref: '#/components/schemas/Partition' PartitionResponse: + type: object + properties: + partition: + $ref: '#/components/schemas/Partition' + Partition: type: object properties: spec: diff --git a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java index 62f99876a397..98f02784d91e 100644 --- a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java +++ b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java @@ -18,6 +18,7 @@ package org.apache.paimon.open.api; +import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.ResourcePaths; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.AlterTableRequest; @@ -377,7 +378,7 @@ public ListPartitionsResponse listPartitions( @PathVariable String table) { Map spec = new HashMap<>(); spec.put("f1", "1"); - PartitionResponse partition = new PartitionResponse(spec, 1, 2, 3, 4); + Partition partition = new Partition(spec, 1, 2, 3, 4); return new ListPartitionsResponse(ImmutableList.of(partition)); } @@ -404,7 +405,7 @@ public PartitionResponse createPartition( @RequestBody CreatePartitionRequest request) { Map spec = new HashMap<>(); spec.put("f1", "1"); - return new PartitionResponse(spec, 0, 0, 0, 4); + return new PartitionResponse(new Partition(spec, 0, 0, 0, 4)); } @Operation(