From 3395f076c053969429989b6406c251dbcd1c83df Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Wed, 8 Jan 2025 16:46:54 +0800 Subject: [PATCH 1/3] [core] Introduce catalog loader in Catalog to replace MetastoreClient --- .../org/apache/paimon/AbstractFileStore.java | 23 +- .../paimon/catalog/AbstractCatalog.java | 8 +- .../apache/paimon/catalog/CachingCatalog.java | 88 +++---- .../paimon/catalog/CachingCatalogLoader.java | 40 +++ .../org/apache/paimon/catalog/Catalog.java | 75 +++--- .../apache/paimon/catalog/CatalogLoader.java | 3 + .../paimon/catalog/DelegateCatalog.java | 2 +- .../paimon/catalog/FileSystemCatalog.java | 5 + .../catalog/FileSystemCatalogLoader.java | 44 ++++ .../org/apache/paimon/jdbc/JdbcCatalog.java | 6 + .../apache/paimon/jdbc/JdbcCatalogLoader.java | 47 ++++ .../metastore/AddPartitionCommitCallback.java | 11 +- .../metastore/AddPartitionTagCallback.java | 14 +- .../paimon/metastore/MetastoreClient.java | 52 ---- .../paimon/operation/PartitionExpire.java | 36 ++- .../actions/AddDonePartitionAction.java | 14 +- .../actions/MarkPartitionDoneEventAction.java | 13 +- .../actions/PartitionMarkDoneAction.java | 16 +- .../paimon/privilege/PrivilegedCatalog.java | 37 ++- .../privilege/PrivilegedCatalogLoader.java | 55 ++++ .../org/apache/paimon/rest/RESTCatalog.java | 6 + .../paimon/table/AbstractFileStoreTable.java | 44 ++-- .../paimon/table/CatalogEnvironment.java | 15 +- .../apache/paimon/table/PartitionHandler.java | 74 ++++++ .../paimon/catalog/CachingCatalogTest.java | 23 +- .../catalog/TestableCachingCatalog.java | 22 +- .../paimon/operation/PartitionExpireTest.java | 77 +++--- .../privilege/PrivilegedCatalogTest.java | 21 +- .../procedure/ExpirePartitionsProcedure.java | 9 +- .../flink/action/ExpirePartitionsAction.java | 9 +- .../procedure/ExpirePartitionsProcedure.java | 9 +- .../PartitionStatisticsReporter.java | 16 +- .../partition/ReportPartStatsListener.java | 9 +- .../partition/AddDonePartitionActionTest.java | 48 ++-- .../PartitionStatisticsReporterTest.java | 41 ++- .../org/apache/paimon/hive/HiveCatalog.java | 104 ++++---- .../apache/paimon/hive/HiveCatalogLoader.java | 54 ++++ .../paimon/hive/HiveMetastoreClient.java | 246 ------------------ .../paimon/hive/HiveCatalogITCaseBase.java | 12 +- .../procedure/ExpirePartitionsProcedure.java | 9 +- .../spark/PaimonPartitionManagement.scala | 41 ++- .../sql/DDLWithHiveCatalogTestBase.scala | 20 +- 42 files changed, 758 insertions(+), 740 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalogLoader.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java delete mode 100644 paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalogLoader.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/PartitionHandler.java create mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java delete mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 22987c6292e5..d2efe5aef566 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -29,7 +29,6 @@ import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.metastore.AddPartitionTagCallback; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.operation.ManifestsReader; @@ -45,6 +44,7 @@ import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.table.sink.CallbackUtils; import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.TagCallback; @@ -345,11 +345,9 @@ public PartitionExpire newPartitionExpire(String commitUser) { return null; } - MetastoreClient.Factory metastoreClientFactory = - catalogEnvironment.metastoreClientFactory(); - MetastoreClient metastoreClient = null; - if (options.partitionedTableInMetastore() && metastoreClientFactory != null) { - metastoreClient = metastoreClientFactory.create(); + PartitionHandler partitionHandler = null; + if (options.partitionedTableInMetastore()) { + partitionHandler = catalogEnvironment.partitionHandler(); } return new PartitionExpire( @@ -358,7 +356,7 @@ public PartitionExpire newPartitionExpire(String commitUser) { PartitionExpireStrategy.createPartitionExpireStrategy(options, partitionType()), newScan(), newCommit(commitUser), - metastoreClient, + partitionHandler, options.endInputCheckPartitionExpire(), options.partitionExpireMaxNum()); } @@ -377,11 +375,12 @@ public TagAutoManager newTagCreationManager() { public List createTagCallbacks() { List callbacks = new ArrayList<>(CallbackUtils.loadTagCallbacks(options)); String partitionField = options.tagToPartitionField(); - MetastoreClient.Factory metastoreClientFactory = - catalogEnvironment.metastoreClientFactory(); - if (partitionField != null && metastoreClientFactory != null) { - callbacks.add( - new AddPartitionTagCallback(metastoreClientFactory.create(), partitionField)); + + if (partitionField != null) { + PartitionHandler partitionHandler = catalogEnvironment.partitionHandler(); + if (partitionHandler != null) { + callbacks.add(new AddPartitionTagCallback(partitionHandler, partitionField)); + } } if (options.tagCreateSuccessFile()) { callbacks.add(new SuccessFileTagCallback(fileIO, newTagManager().tagDirectory())); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index a6790004a116..2ecbcf61b334 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -24,7 +24,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; @@ -410,7 +409,7 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist lockFactory().orElse(null), lockContext().orElse(null), identifier), - metastoreClientFactory(identifier).orElse(null))); + catalogLoader())); CoreOptions options = table.coreOptions(); if (options.type() == TableType.OBJECT_TABLE) { String objectLocation = options.objectLocation(); @@ -477,11 +476,6 @@ protected TableMeta getDataTableMeta(Identifier identifier) throws TableNotExist protected abstract TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException; - /** Get metastore client factory for the table specified by {@code identifier}. */ - public Optional metastoreClientFactory(Identifier identifier) { - return Optional.empty(); - } - public Path getTableLocation(Identifier identifier) { return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 23408e569250..eb1110bb78c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -18,6 +18,7 @@ package org.apache.paimon.catalog; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.fs.Path; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -53,60 +54,45 @@ /** A {@link Catalog} to cache databases and tables and manifests. */ public class CachingCatalog extends DelegateCatalog { + private final Options options; + private final Duration expirationInterval; private final int snapshotMaxNumPerTable; + private final long cachedPartitionMaxNum; - protected final Cache databaseCache; - protected final Cache tableCache; + protected Cache databaseCache; + protected Cache tableCache; @Nullable protected final SegmentsCache manifestCache; // partition cache will affect data latency - @Nullable protected final Cache> partitionCache; - - public CachingCatalog(Catalog wrapped) { - this( - wrapped, - CACHE_EXPIRATION_INTERVAL_MS.defaultValue(), - CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(), - CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(), - CACHE_PARTITION_MAX_NUM.defaultValue(), - CACHE_SNAPSHOT_MAX_NUM_PER_TABLE.defaultValue()); - } - - public CachingCatalog( - Catalog wrapped, - Duration expirationInterval, - MemorySize manifestMaxMemory, - long manifestCacheThreshold, - long cachedPartitionMaxNum, - int snapshotMaxNumPerTable) { - this( - wrapped, - expirationInterval, - manifestMaxMemory, - manifestCacheThreshold, - cachedPartitionMaxNum, - snapshotMaxNumPerTable, - Ticker.systemTicker()); - } + @Nullable protected Cache> partitionCache; - public CachingCatalog( - Catalog wrapped, - Duration expirationInterval, - MemorySize manifestMaxMemory, - long manifestCacheThreshold, - long cachedPartitionMaxNum, - int snapshotMaxNumPerTable, - Ticker ticker) { + public CachingCatalog(Catalog wrapped, Options options) { super(wrapped); + this.options = options; + MemorySize manifestMaxMemory = options.get(CACHE_MANIFEST_SMALL_FILE_MEMORY); + long manifestCacheThreshold = options.get(CACHE_MANIFEST_SMALL_FILE_THRESHOLD).getBytes(); + Optional maxMemory = options.getOptional(CACHE_MANIFEST_MAX_MEMORY); + if (maxMemory.isPresent() && maxMemory.get().compareTo(manifestMaxMemory) > 0) { + // cache all manifest files + manifestMaxMemory = maxMemory.get(); + manifestCacheThreshold = Long.MAX_VALUE; + } + + this.expirationInterval = options.get(CACHE_EXPIRATION_INTERVAL_MS); if (expirationInterval.isZero() || expirationInterval.isNegative()) { throw new IllegalArgumentException( "When cache.expiration-interval is set to negative or 0, the catalog cache should be disabled."); } + this.snapshotMaxNumPerTable = options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE); + this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold); - this.expirationInterval = expirationInterval; - this.snapshotMaxNumPerTable = snapshotMaxNumPerTable; + this.cachedPartitionMaxNum = options.get(CACHE_PARTITION_MAX_NUM); + init(Ticker.systemTicker()); + } + @VisibleForTesting + void init(Ticker ticker) { this.databaseCache = Caffeine.newBuilder() .softValues() @@ -121,7 +107,6 @@ public CachingCatalog( .expireAfterAccess(expirationInterval) .ticker(ticker) .build(); - this.manifestCache = SegmentsCache.create(manifestMaxMemory, manifestCacheThreshold); this.partitionCache = cachedPartitionMaxNum == 0 ? null @@ -142,21 +127,12 @@ public static Catalog tryToCreate(Catalog catalog, Options options) { return catalog; } - MemorySize manifestMaxMemory = options.get(CACHE_MANIFEST_SMALL_FILE_MEMORY); - long manifestThreshold = options.get(CACHE_MANIFEST_SMALL_FILE_THRESHOLD).getBytes(); - Optional maxMemory = options.getOptional(CACHE_MANIFEST_MAX_MEMORY); - if (maxMemory.isPresent() && maxMemory.get().compareTo(manifestMaxMemory) > 0) { - // cache all manifest files - manifestMaxMemory = maxMemory.get(); - manifestThreshold = Long.MAX_VALUE; - } - return new CachingCatalog( - catalog, - options.get(CACHE_EXPIRATION_INTERVAL_MS), - manifestMaxMemory, - manifestThreshold, - options.get(CACHE_PARTITION_MAX_NUM), - options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE)); + return new CachingCatalog(catalog, options); + } + + @Override + public CatalogLoader catalogLoader() { + return new CachingCatalogLoader(wrapped.catalogLoader(), options); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalogLoader.java new file mode 100644 index 000000000000..f400f7126f53 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalogLoader.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.options.Options; + +/** Loader to create {@link CachingCatalog}. */ +public class CachingCatalogLoader implements CatalogLoader { + + private static final long serialVersionUID = 1L; + + private final CatalogLoader catalogLoader; + private final Options options; + + public CachingCatalogLoader(CatalogLoader catalogLoader, Options options) { + this.catalogLoader = catalogLoader; + this.options = options; + } + + @Override + public Catalog load() { + return CachingCatalog.tryToCreate(catalogLoader.load(), options); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index e7d07d6dc433..d0ad86c224d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -40,39 +40,6 @@ @Public public interface Catalog extends AutoCloseable { - // constants for system table and database - String SYSTEM_TABLE_SPLITTER = "$"; - String SYSTEM_DATABASE_NAME = "sys"; - String SYSTEM_BRANCH_PREFIX = "branch_"; - - // constants for table and database - String COMMENT_PROP = "comment"; - String OWNER_PROP = "owner"; - - // constants for database - String DEFAULT_DATABASE = "default"; - String DB_SUFFIX = ".db"; - String DB_LOCATION_PROP = "location"; - - // constants for table - String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; - String NUM_ROWS_PROP = "numRows"; - String NUM_FILES_PROP = "numFiles"; - String TOTAL_SIZE_PROP = "totalSize"; - String LAST_UPDATE_TIME_PROP = "lastUpdateTime"; - - /** Warehouse root path for creating new databases. */ - String warehouse(); - - /** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */ - FileIO fileIO(); - - /** Catalog options for re-creating this catalog. */ - Map options(); - - /** Return a boolean that indicates whether this catalog is case-sensitive. */ - boolean caseSensitive(); - // ======================= database methods =============================== /** @@ -399,6 +366,48 @@ default void repairTable(Identifier identifier) throws TableNotExistException { throw new UnsupportedOperationException(); } + // ==================== Catalog Information ========================== + + /** Warehouse root path for creating new databases. */ + String warehouse(); + + /** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */ + FileIO fileIO(); + + /** Catalog options for re-creating this catalog. */ + Map options(); + + /** Serializable loader to create catalog. */ + CatalogLoader catalogLoader(); + + /** Return a boolean that indicates whether this catalog is case-sensitive. */ + boolean caseSensitive(); + + // ======================= Constants =============================== + + // constants for system table and database + String SYSTEM_TABLE_SPLITTER = "$"; + String SYSTEM_DATABASE_NAME = "sys"; + String SYSTEM_BRANCH_PREFIX = "branch_"; + + // constants for table and database + String COMMENT_PROP = "comment"; + String OWNER_PROP = "owner"; + + // constants for database + String DEFAULT_DATABASE = "default"; + String DB_SUFFIX = ".db"; + String DB_LOCATION_PROP = "location"; + + // constants for table + String TABLE_DEFAULT_OPTION_PREFIX = "table-default."; + String NUM_ROWS_PROP = "numRows"; + String NUM_FILES_PROP = "numFiles"; + String TOTAL_SIZE_PROP = "totalSize"; + String LAST_UPDATE_TIME_PROP = "lastUpdateTime"; + + // ======================= Exceptions =============================== + /** Exception for trying to drop on a database that is not empty. */ class DatabaseNotEmptyException extends Exception { private static final String MSG = "Database %s is not empty."; diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java index c8de08139cb7..0f75a313d7c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java @@ -18,9 +18,12 @@ package org.apache.paimon.catalog; +import org.apache.paimon.annotation.Public; + import java.io.Serializable; /** Loader for creating a {@link Catalog}. */ +@Public @FunctionalInterface public interface CatalogLoader extends Serializable { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 23c50e998635..aa7852456e5a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -29,7 +29,7 @@ import java.util.Map; /** A {@link Catalog} to delegate all operations to another {@link Catalog}. */ -public class DelegateCatalog implements Catalog { +public abstract class DelegateCatalog implements Catalog { protected final Catalog wrapped; diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index cb0c358259f8..254826b91da4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -162,6 +162,11 @@ public String warehouse() { return warehouse.toString(); } + @Override + public CatalogLoader catalogLoader() { + return new FileSystemCatalogLoader(fileIO, warehouse, catalogOptions); + } + @Override public boolean caseSensitive() { return catalogOptions.getOptional(CASE_SENSITIVE).orElse(true); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java new file mode 100644 index 000000000000..0fb5da6c4d03 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogLoader.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; + +/** Loader to create {@link FileSystemCatalog}. */ +public class FileSystemCatalogLoader implements CatalogLoader { + + private static final long serialVersionUID = 1L; + + private final FileIO fileIO; + private final Path warehouse; + private final Options options; + + public FileSystemCatalogLoader(FileIO fileIO, Path warehouse, Options options) { + this.fileIO = fileIO; + this.warehouse = warehouse; + this.options = options; + } + + @Override + public Catalog load() { + return new FileSystemCatalog(fileIO, warehouse, options); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 63cb54c180f5..327c6b9676c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -20,6 +20,7 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.CatalogLockContext; import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Database; @@ -149,6 +150,11 @@ public String warehouse() { return warehouse; } + @Override + public CatalogLoader catalogLoader() { + return new JdbcCatalogLoader(fileIO, catalogKey, options, warehouse); + } + @Override public List listDatabases() { List databases = Lists.newArrayList(); diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java new file mode 100644 index 000000000000..a1fa36e4b206 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLoader.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.options.Options; + +/** Loader to create {@link JdbcCatalog}. */ +public class JdbcCatalogLoader implements CatalogLoader { + + private static final long serialVersionUID = 1L; + + private final FileIO fileIO; + private final String catalogKey; + private final Options options; + private final String warehouse; + + public JdbcCatalogLoader(FileIO fileIO, String catalogKey, Options options, String warehouse) { + this.fileIO = fileIO; + this.catalogKey = catalogKey; + this.options = options; + this.warehouse = warehouse; + } + + @Override + public Catalog load() { + return new JdbcCatalog(fileIO, catalogKey, options, warehouse); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java index 26fb9ed48db2..dbd42d546669 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java @@ -23,6 +23,7 @@ import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.utils.InternalRowPartitionComputer; @@ -48,12 +49,12 @@ public class AddPartitionCommitCallback implements CommitCallback { .softValues() .build(); - private final MetastoreClient client; + private final PartitionHandler partitionHandler; private final InternalRowPartitionComputer partitionComputer; public AddPartitionCommitCallback( - MetastoreClient client, InternalRowPartitionComputer partitionComputer) { - this.client = client; + PartitionHandler partitionHandler, InternalRowPartitionComputer partitionComputer) { + this.partitionHandler = partitionHandler; this.partitionComputer = partitionComputer; } @@ -85,7 +86,7 @@ private void addPartitions(Set partitions) { } } if (!newPartitions.isEmpty()) { - client.addPartitions( + partitionHandler.createPartitions( newPartitions.stream() .map(partitionComputer::generatePartValues) .collect(Collectors.toList())); @@ -98,6 +99,6 @@ private void addPartitions(Set partitions) { @Override public void close() throws Exception { - client.close(); + partitionHandler.close(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java index 31bb521e88d1..ca8c7b769065 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionTagCallback.java @@ -18,18 +18,20 @@ package org.apache.paimon.metastore; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.table.sink.TagCallback; +import java.util.Collections; import java.util.LinkedHashMap; /** A {@link TagCallback} to add newly created partitions to metastore. */ public class AddPartitionTagCallback implements TagCallback { - private final MetastoreClient client; + private final PartitionHandler partitionHandler; private final String partitionField; - public AddPartitionTagCallback(MetastoreClient client, String partitionField) { - this.client = client; + public AddPartitionTagCallback(PartitionHandler partitionHandler, String partitionField) { + this.partitionHandler = partitionHandler; this.partitionField = partitionField; } @@ -38,7 +40,7 @@ public void notifyCreation(String tagName) { LinkedHashMap partitionSpec = new LinkedHashMap<>(); partitionSpec.put(partitionField, tagName); try { - client.addPartition(partitionSpec); + partitionHandler.createPartitions(Collections.singletonList(partitionSpec)); } catch (Exception e) { throw new RuntimeException(e); } @@ -49,7 +51,7 @@ public void notifyDeletion(String tagName) { LinkedHashMap partitionSpec = new LinkedHashMap<>(); partitionSpec.put(partitionField, tagName); try { - client.dropPartition(partitionSpec); + partitionHandler.dropPartitions(Collections.singletonList(partitionSpec)); } catch (Exception e) { throw new RuntimeException(e); } @@ -57,6 +59,6 @@ public void notifyDeletion(String tagName) { @Override public void close() throws Exception { - client.close(); + partitionHandler.close(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java b/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java deleted file mode 100644 index f24049eca9bf..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/metastore/MetastoreClient.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.metastore; - -import org.apache.paimon.partition.Partition; - -import java.io.Serializable; -import java.util.LinkedHashMap; -import java.util.List; - -/** - * A metastore client related to a table. All methods of this interface operate on the same specific - * table. - */ -public interface MetastoreClient extends AutoCloseable { - - void addPartition(LinkedHashMap partition) throws Exception; - - void addPartitions(List> partitions) throws Exception; - - void dropPartition(LinkedHashMap partition) throws Exception; - - void dropPartitions(List> partitions) throws Exception; - - void markPartitionDone(LinkedHashMap partition) throws Exception; - - default void alterPartition(Partition partition) throws Exception { - throw new UnsupportedOperationException(); - } - - /** Factory to create {@link MetastoreClient}. */ - interface Factory extends Serializable { - - MetastoreClient create(); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index 68ef8a123746..e4d7352f6966 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -19,11 +19,12 @@ package org.apache.paimon.operation; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.manifest.PartitionEntry; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.partition.PartitionExpireStrategy; import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy; +import org.apache.paimon.table.PartitionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +35,6 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -50,7 +50,7 @@ public class PartitionExpire { private final Duration checkInterval; private final FileStoreScan scan; private final FileStoreCommit commit; - private final MetastoreClient metastoreClient; + @Nullable private final PartitionHandler partitionHandler; private LocalDateTime lastCheck; private final PartitionExpireStrategy strategy; private final boolean endInputCheckPartitionExpire; @@ -62,7 +62,7 @@ public PartitionExpire( PartitionExpireStrategy strategy, FileStoreScan scan, FileStoreCommit commit, - @Nullable MetastoreClient metastoreClient, + @Nullable PartitionHandler partitionHandler, boolean endInputCheckPartitionExpire, int maxExpireNum) { this.expirationTime = expirationTime; @@ -70,7 +70,7 @@ public PartitionExpire( this.strategy = strategy; this.scan = scan; this.commit = commit; - this.metastoreClient = metastoreClient; + this.partitionHandler = partitionHandler; this.lastCheck = LocalDateTime.now(); this.endInputCheckPartitionExpire = endInputCheckPartitionExpire; this.maxExpireNum = maxExpireNum; @@ -82,7 +82,7 @@ public PartitionExpire( PartitionExpireStrategy strategy, FileStoreScan scan, FileStoreCommit commit, - @Nullable MetastoreClient metastoreClient, + @Nullable PartitionHandler partitionHandler, int maxExpireNum) { this( expirationTime, @@ -90,7 +90,7 @@ public PartitionExpire( strategy, scan, commit, - metastoreClient, + partitionHandler, false, maxExpireNum); } @@ -158,25 +158,19 @@ private List> doExpire( // convert partition value to partition string, and limit the partition num expired = convertToPartitionString(expiredPartValues); LOG.info("Expire Partitions: {}", expired); - if (metastoreClient != null) { - deleteMetastorePartitions(expired); + if (partitionHandler != null) { + try { + partitionHandler.dropPartitions(expired); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } else { + commit.dropPartitions(expired, commitIdentifier); } - commit.dropPartitions(expired, commitIdentifier); } return expired; } - private void deleteMetastorePartitions(List> partitions) { - if (metastoreClient != null && partitions.size() > 0) { - try { - metastoreClient.dropPartitions( - partitions.stream().map(LinkedHashMap::new).collect(Collectors.toList())); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } - private List> convertToPartitionString( List> expiredPartValues) { return expiredPartValues.stream() diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java index c6db6cb6e63a..3d76a7d37455 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java @@ -19,11 +19,12 @@ package org.apache.paimon.partition.actions; import org.apache.paimon.fs.Path; -import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; import java.io.IOException; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -31,10 +32,11 @@ /** A {@link PartitionMarkDoneAction} which add ".done" partition. */ public class AddDonePartitionAction implements PartitionMarkDoneAction { - private final MetastoreClient metastoreClient; - public AddDonePartitionAction(MetastoreClient metastoreClient) { - this.metastoreClient = metastoreClient; + private final PartitionHandler partitionHandler; + + public AddDonePartitionAction(PartitionHandler partitionHandler) { + this.partitionHandler = partitionHandler; } @Override @@ -42,7 +44,7 @@ public void markDone(String partition) throws Exception { LinkedHashMap doneSpec = extractPartitionSpecFromPath(new Path(partition)); Map.Entry lastField = tailEntry(doneSpec); doneSpec.put(lastField.getKey(), lastField.getValue() + ".done"); - metastoreClient.addPartition(doneSpec); + partitionHandler.createPartitions(Collections.singletonList(doneSpec)); } private Map.Entry tailEntry(LinkedHashMap partitionSpec) { @@ -52,7 +54,7 @@ private Map.Entry tailEntry(LinkedHashMap partit @Override public void close() throws IOException { try { - metastoreClient.close(); + partitionHandler.close(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java index 8cc1c93ba937..ae81c0dde1f2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java @@ -19,9 +19,10 @@ package org.apache.paimon.partition.actions; import org.apache.paimon.fs.Path; -import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.table.PartitionHandler; import java.io.IOException; +import java.util.Collections; import java.util.LinkedHashMap; import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; @@ -29,23 +30,23 @@ /** A {@link PartitionMarkDoneAction} which add mark "PartitionEventType.LOAD_DONE". */ public class MarkPartitionDoneEventAction implements PartitionMarkDoneAction { - private final MetastoreClient metastoreClient; + private final PartitionHandler partitionHandler; - public MarkPartitionDoneEventAction(MetastoreClient metastoreClient) { - this.metastoreClient = metastoreClient; + public MarkPartitionDoneEventAction(PartitionHandler partitionHandler) { + this.partitionHandler = partitionHandler; } @Override public void markDone(String partition) throws Exception { LinkedHashMap partitionSpec = extractPartitionSpecFromPath(new Path(partition)); - metastoreClient.markPartitionDone(partitionSpec); + partitionHandler.markDonePartitions(Collections.singletonList(partitionSpec)); } @Override public void close() throws IOException { try { - metastoreClient.close(); + partitionHandler.close(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java index 4bdb49823d52..f5259f22054a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java @@ -19,8 +19,8 @@ package org.apache.paimon.partition.actions; import org.apache.paimon.CoreOptions; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.utils.StringUtils; import java.io.Closeable; @@ -55,10 +55,10 @@ static List createActions( fileStoreTable.fileIO(), fileStoreTable.location()); case DONE_PARTITION: return new AddDonePartitionAction( - createMetastoreClient(fileStoreTable, options)); + createPartitionHandler(fileStoreTable, options)); case MARK_EVENT: return new MarkPartitionDoneEventAction( - createMetastoreClient(fileStoreTable, options)); + createPartitionHandler(fileStoreTable, options)); case CUSTOM: return generateCustomMarkDoneAction(cl, options); default: @@ -87,20 +87,18 @@ static PartitionMarkDoneAction generateCustomMarkDoneAction( } } - static MetastoreClient createMetastoreClient(FileStoreTable table, CoreOptions options) { - MetastoreClient.Factory metastoreClientFactory = - table.catalogEnvironment().metastoreClientFactory(); + static PartitionHandler createPartitionHandler(FileStoreTable table, CoreOptions options) { + PartitionHandler partitionHandler = table.catalogEnvironment().partitionHandler(); if (options.toConfiguration().get(PARTITION_MARK_DONE_ACTION).contains("done-partition")) { checkNotNull( - metastoreClientFactory, - "Cannot mark done partition for table without metastore."); + partitionHandler, "Cannot mark done partition for table without metastore."); checkArgument( options.partitionedTableInMetastore(), "Table should enable %s", METASTORE_PARTITIONED_TABLE.key()); } - return metastoreClientFactory.create(); + return partitionHandler; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java index 6be09fa9b992..070486135a3f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java @@ -19,9 +19,11 @@ package org.apache.paimon.privilege; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.DelegateCatalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; import org.apache.paimon.options.Options; @@ -45,22 +47,37 @@ public class PrivilegedCatalog extends DelegateCatalog { .stringType() .defaultValue(PrivilegeManager.PASSWORD_ANONYMOUS); + private final String warehouse; + private final FileIO fileIO; + private final String user; + private final String password; + private final PrivilegeManager privilegeManager; - public PrivilegedCatalog(Catalog wrapped, PrivilegeManager privilegeManager) { + public PrivilegedCatalog( + Catalog wrapped, String warehouse, FileIO fileIO, String user, String password) { super(wrapped); - this.privilegeManager = privilegeManager; + this.warehouse = warehouse; + this.fileIO = fileIO; + this.user = user; + this.password = password; + this.privilegeManager = new FileBasedPrivilegeManager(warehouse, fileIO, user, password); } public static Catalog tryToCreate(Catalog catalog, Options options) { - PrivilegeManager privilegeManager = - new FileBasedPrivilegeManager( + if (new FileBasedPrivilegeManager( catalog.warehouse(), catalog.fileIO(), options.get(PrivilegedCatalog.USER), - options.get(PrivilegedCatalog.PASSWORD)); - if (privilegeManager.privilegeEnabled()) { - catalog = new PrivilegedCatalog(catalog, privilegeManager); + options.get(PrivilegedCatalog.PASSWORD)) + .privilegeEnabled()) { + catalog = + new PrivilegedCatalog( + catalog, + catalog.warehouse(), + catalog.fileIO(), + options.get(PrivilegedCatalog.USER), + options.get(PrivilegedCatalog.PASSWORD)); } return catalog; } @@ -69,6 +86,12 @@ public PrivilegeManager privilegeManager() { return privilegeManager; } + @Override + public CatalogLoader catalogLoader() { + return new PrivilegedCatalogLoader( + wrapped.catalogLoader(), warehouse, fileIO, user, password); + } + @Override public void createDatabase(String name, boolean ignoreIfExists, Map properties) throws DatabaseAlreadyExistException { diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalogLoader.java new file mode 100644 index 000000000000..f526330f03f1 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalogLoader.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.privilege; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; +import org.apache.paimon.fs.FileIO; + +/** Loader to create {@link PrivilegedCatalog}. */ +public class PrivilegedCatalogLoader implements CatalogLoader { + + private static final long serialVersionUID = 1L; + + private final CatalogLoader catalogLoader; + + private final String warehouse; + private final FileIO fileIO; + private final String user; + private final String password; + + public PrivilegedCatalogLoader( + CatalogLoader catalogLoader, + String warehouse, + FileIO fileIO, + String user, + String password) { + this.catalogLoader = catalogLoader; + this.warehouse = warehouse; + this.fileIO = fileIO; + this.user = user; + this.password = password; + } + + @Override + public Catalog load() { + Catalog catalog = catalogLoader.load(); + return new PrivilegedCatalog(catalog, warehouse, fileIO, user, password); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 8659fbf655bb..4c8ac03bb26e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -22,6 +22,7 @@ import org.apache.paimon.TableType; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; @@ -164,6 +165,11 @@ public Map options() { return options.toMap(); } + @Override + public CatalogLoader catalogLoader() { + throw new UnsupportedOperationException("Not supported yet."); + } + @Override public FileIO fileIO() { return fileIO; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 935469a8196d..ea10e824d505 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -29,7 +29,6 @@ import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.metastore.AddPartitionCommitCallback; import org.apache.paimon.metastore.AddPartitionTagCallback; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.metastore.TagPreviewCommitCallback; import org.apache.paimon.operation.DefaultValueAssigner; import org.apache.paimon.operation.FileStoreScan; @@ -464,34 +463,33 @@ protected List createCommitCallbacks(String commitUser) { List callbacks = new ArrayList<>(CallbackUtils.loadCommitCallbacks(coreOptions())); CoreOptions options = coreOptions(); - MetastoreClient.Factory metastoreClientFactory = - catalogEnvironment.metastoreClientFactory(); - - if (options.partitionedTableInMetastore() - && metastoreClientFactory != null - && !tableSchema.partitionKeys().isEmpty()) { - InternalRowPartitionComputer partitionComputer = - new InternalRowPartitionComputer( - options.partitionDefaultName(), - tableSchema.logicalPartitionType(), - tableSchema.partitionKeys().toArray(new String[0]), - options.legacyPartitionName()); - callbacks.add( - new AddPartitionCommitCallback( - metastoreClientFactory.create(), partitionComputer)); + + if (options.partitionedTableInMetastore() && !tableSchema.partitionKeys().isEmpty()) { + PartitionHandler partitionHandler = catalogEnvironment.partitionHandler(); + if (partitionHandler != null) { + InternalRowPartitionComputer partitionComputer = + new InternalRowPartitionComputer( + options.partitionDefaultName(), + tableSchema.logicalPartitionType(), + tableSchema.partitionKeys().toArray(new String[0]), + options.legacyPartitionName()); + callbacks.add(new AddPartitionCommitCallback(partitionHandler, partitionComputer)); + } } TagPreview tagPreview = TagPreview.create(options); if (options.tagToPartitionField() != null && tagPreview != null - && metastoreClientFactory != null && tableSchema.partitionKeys().isEmpty()) { - TagPreviewCommitCallback callback = - new TagPreviewCommitCallback( - new AddPartitionTagCallback( - metastoreClientFactory.create(), options.tagToPartitionField()), - tagPreview); - callbacks.add(callback); + PartitionHandler partitionHandler = catalogEnvironment.partitionHandler(); + if (partitionHandler != null) { + TagPreviewCommitCallback callback = + new TagPreviewCommitCallback( + new AddPartitionTagCallback( + partitionHandler, options.tagToPartitionField()), + tagPreview); + callbacks.add(callback); + } } return callbacks; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java index a722d9e21ada..855ccf934695 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java @@ -18,8 +18,8 @@ package org.apache.paimon.table; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.Lock; import javax.annotation.Nullable; @@ -34,17 +34,17 @@ public class CatalogEnvironment implements Serializable { @Nullable private final Identifier identifier; @Nullable private final String uuid; private final Lock.Factory lockFactory; - @Nullable private final MetastoreClient.Factory metastoreClientFactory; + @Nullable private final CatalogLoader catalogLoader; public CatalogEnvironment( @Nullable Identifier identifier, @Nullable String uuid, Lock.Factory lockFactory, - @Nullable MetastoreClient.Factory metastoreClientFactory) { + @Nullable CatalogLoader catalogLoader) { this.identifier = identifier; this.uuid = uuid; this.lockFactory = lockFactory; - this.metastoreClientFactory = metastoreClientFactory; + this.catalogLoader = catalogLoader; } public static CatalogEnvironment empty() { @@ -66,7 +66,10 @@ public Lock.Factory lockFactory() { } @Nullable - public MetastoreClient.Factory metastoreClientFactory() { - return metastoreClientFactory; + public PartitionHandler partitionHandler() { + if (catalogLoader == null) { + return null; + } + return PartitionHandler.create(catalogLoader.load(), identifier); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PartitionHandler.java b/paimon-core/src/main/java/org/apache/paimon/table/PartitionHandler.java new file mode 100644 index 000000000000..af783707ea43 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/PartitionHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.partition.Partition; + +import java.util.List; +import java.util.Map; + +/** Handler to handle partitions. */ +public interface PartitionHandler extends AutoCloseable { + + void createPartitions(List> partitions) + throws Catalog.TableNotExistException; + + void dropPartitions(List> partitions) throws Catalog.TableNotExistException; + + void alterPartitions(List partitions) throws Catalog.TableNotExistException; + + void markDonePartitions(List> partitions) + throws Catalog.TableNotExistException; + + static PartitionHandler create(Catalog catalog, Identifier identifier) { + return new PartitionHandler() { + + @Override + public void createPartitions(List> partitions) + throws Catalog.TableNotExistException { + catalog.createPartitions(identifier, partitions); + } + + @Override + public void dropPartitions(List> partitions) + throws Catalog.TableNotExistException { + catalog.dropPartitions(identifier, partitions); + } + + @Override + public void alterPartitions(List partitions) + throws Catalog.TableNotExistException { + catalog.alterPartitions(identifier, partitions); + } + + @Override + public void markDonePartitions(List> partitions) + throws Catalog.TableNotExistException { + catalog.markDonePartitions(identifier, partitions); + } + + @Override + public void close() throws Exception { + catalog.close(); + } + }; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index c028fa7421d5..f23b07c7676d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -59,9 +59,12 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.paimon.data.BinaryString.fromString; +import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY; import static org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD; +import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM; +import static org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.doNothing; @@ -92,7 +95,7 @@ public void testListDatabasesWhenNoDatabases() { @Test public void testInvalidateWhenDatabaseIsAltered() throws Exception { Catalog mockcatalog = Mockito.mock(Catalog.class); - Catalog catalog = new CachingCatalog(mockcatalog); + Catalog catalog = new CachingCatalog(mockcatalog, new Options()); String databaseName = "db"; boolean ignoreIfExists = false; Database database = Database.of(databaseName); @@ -111,7 +114,7 @@ public void testInvalidateWhenDatabaseIsAltered() throws Exception { @Test public void testInvalidateSystemTablesIfBaseTableIsModified() throws Exception { - Catalog catalog = new CachingCatalog(this.catalog); + Catalog catalog = new CachingCatalog(this.catalog, new Options()); Identifier tableIdent = new Identifier("db", "tbl"); catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA, false); Identifier sysIdent = new Identifier("db", "tbl$files"); @@ -369,14 +372,14 @@ public void testManifestCache() throws Exception { } private void innerTestManifestCache(long manifestCacheThreshold) throws Exception { - Catalog catalog = - new CachingCatalog( - this.catalog, - Duration.ofSeconds(10), - MemorySize.ofMebiBytes(1), - manifestCacheThreshold, - 0L, - 10); + Options options = new Options(); + options.set(CACHE_EXPIRATION_INTERVAL_MS, Duration.ofSeconds(10)); + options.set(CACHE_MANIFEST_SMALL_FILE_MEMORY, MemorySize.ofMebiBytes(1)); + options.set( + CACHE_MANIFEST_SMALL_FILE_THRESHOLD, MemorySize.ofBytes(manifestCacheThreshold)); + options.set(CACHE_PARTITION_MAX_NUM, 0L); + options.set(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE, 10); + Catalog catalog = new CachingCatalog(this.catalog, options); Identifier tableIdent = new Identifier("db", "tbl"); catalog.dropTable(tableIdent, true); catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false); diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java index 0eaf23a1a28d..05dbae004ada 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java @@ -18,7 +18,7 @@ package org.apache.paimon.catalog; -import org.apache.paimon.options.MemorySize; +import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.table.Table; @@ -29,6 +29,9 @@ import java.util.List; import java.util.Optional; +import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS; +import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM; + /** * A wrapper around CachingCatalog that provides accessor methods to test the underlying cache, * without making those fields public in the CachingCatalog itself. @@ -38,17 +41,18 @@ public class TestableCachingCatalog extends CachingCatalog { private final Duration cacheExpirationInterval; public TestableCachingCatalog(Catalog catalog, Duration expirationInterval, Ticker ticker) { - super( - catalog, - expirationInterval, - MemorySize.ZERO, - Long.MAX_VALUE, - Long.MAX_VALUE, - Integer.MAX_VALUE, - ticker); + super(catalog, createOptions(expirationInterval)); + init(ticker); this.cacheExpirationInterval = expirationInterval; } + private static Options createOptions(Duration expirationInterval) { + Options options = new Options(); + options.set(CACHE_EXPIRATION_INTERVAL_MS, expirationInterval); + options.set(CACHE_PARTITION_MAX_NUM, 100L); + return options; + } + public Cache tableCache() { // cleanUp must be called as tests apply assertions directly on the underlying map, but // metadata table map entries are cleaned up asynchronously. diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java index 893fe1bf5762..8e7679a5ade0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; @@ -27,14 +28,16 @@ import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.options.Options; +import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.PartitionHandler; +import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.StreamTableCommit; @@ -72,6 +75,7 @@ import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER; import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.CoreOptions.WRITE_ONLY; +import static org.apache.paimon.CoreOptions.createCommitUser; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -82,7 +86,7 @@ public class PartitionExpireTest { private Path path; private FileStoreTable table; - private List> deletedPartitions; + private List> deletedPartitions; @BeforeEach public void beforeEach() { @@ -97,36 +101,45 @@ private void newTable() { String branchName = CoreOptions.branch(options.toMap()); TableSchema tableSchema = new SchemaManager(fileIO, tablePath, branchName).latest().get(); deletedPartitions = new ArrayList<>(); - MetastoreClient.Factory factory = - () -> - new MetastoreClient() { - @Override - public void addPartition(LinkedHashMap partition) {} - - @Override - public void addPartitions( - List> partitions) {} - - @Override - public void dropPartition(LinkedHashMap partition) { - deletedPartitions.add(partition); - } - - @Override - public void dropPartitions( - List> partitions) { - deletedPartitions.addAll(partitions); - } - - @Override - public void markPartitionDone(LinkedHashMap partition) { - throw new UnsupportedOperationException(); - } - - @Override - public void close() {} - }; - CatalogEnvironment env = new CatalogEnvironment(null, null, Lock.emptyFactory(), factory); + PartitionHandler partitionHandler = + new PartitionHandler() { + @Override + public void createPartitions(List> partitions) + throws Catalog.TableNotExistException {} + + @Override + public void dropPartitions(List> partitions) + throws Catalog.TableNotExistException { + deletedPartitions.addAll(partitions); + try (FileStoreCommit commit = + table.store() + .newCommit( + createCommitUser( + table.coreOptions().toConfiguration()))) { + commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER); + } + } + + @Override + public void alterPartitions(List partitions) + throws Catalog.TableNotExistException {} + + @Override + public void markDonePartitions(List> partitions) + throws Catalog.TableNotExistException {} + + @Override + public void close() throws Exception {} + }; + + CatalogEnvironment env = + new CatalogEnvironment(null, null, Lock.emptyFactory(), null) { + + @Override + public PartitionHandler partitionHandler() { + return partitionHandler; + } + }; table = FileStoreTableFactory.create(fileIO, path, tableSchema, env); } diff --git a/paimon-core/src/test/java/org/apache/paimon/privilege/PrivilegedCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/privilege/PrivilegedCatalogTest.java index 019707108730..7f7b73b76f65 100644 --- a/paimon-core/src/test/java/org/apache/paimon/privilege/PrivilegedCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/privilege/PrivilegedCatalogTest.java @@ -42,8 +42,10 @@ public class PrivilegedCatalogTest extends FileSystemCatalogTest { @BeforeEach public void setUp() throws Exception { super.setUp(); - getPrivilegeManager("anonymous", "anonymous").initializePrivilege(PASSWORD_ROOT); - catalog = new PrivilegedCatalog(catalog, getPrivilegeManager("root", PASSWORD_ROOT)); + create(catalog, "anonymous", "anonymous") + .privilegeManager() + .initializePrivilege(PASSWORD_ROOT); + catalog = create(catalog, "root", PASSWORD_ROOT); } @Override @@ -54,10 +56,8 @@ public void testGetTable() throws Exception { PrivilegedCatalog rootCatalog = ((PrivilegedCatalog) catalog); rootCatalog.createPrivilegedUser(USERNAME_TEST_USER, PASSWORD_TEST_USER); - Catalog userCatalog = - new PrivilegedCatalog( - rootCatalog.wrapped(), - getPrivilegeManager(USERNAME_TEST_USER, PASSWORD_TEST_USER)); + Catalog userCatalog = create(rootCatalog.wrapped(), USERNAME_TEST_USER, PASSWORD_TEST_USER); + FileStoreTable dataTable = (FileStoreTable) userCatalog.getTable(identifier); assertNoPrivilege(dataTable::snapshotManager); @@ -65,10 +65,7 @@ public void testGetTable() throws Exception { assertNoPrivilege(() -> dataTable.snapshot(0)); rootCatalog.grantPrivilegeOnTable(USERNAME_TEST_USER, identifier, PrivilegeType.SELECT); - userCatalog = - new PrivilegedCatalog( - rootCatalog.wrapped(), - getPrivilegeManager(USERNAME_TEST_USER, PASSWORD_TEST_USER)); + userCatalog = create(rootCatalog.wrapped(), USERNAME_TEST_USER, PASSWORD_TEST_USER); FileStoreTable dataTable2 = (FileStoreTable) userCatalog.getTable(identifier); assertThat(dataTable2.snapshotManager().latestSnapshotId()).isNull(); @@ -76,8 +73,8 @@ public void testGetTable() throws Exception { assertThatThrownBy(() -> dataTable2.snapshot(0)).isNotNull(); } - private FileBasedPrivilegeManager getPrivilegeManager(String user, String password) { - return new FileBasedPrivilegeManager(warehouse, fileIO, user, password); + private PrivilegedCatalog create(Catalog catalog, String user, String password) { + return new PrivilegedCatalog(catalog, warehouse, fileIO, user, password); } private void assertNoPrivilege(Executable executable) { diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index 1c0d73cfbe38..36e2bd1f0964 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -21,7 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.TimeUtils; @@ -32,7 +31,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; @@ -88,12 +86,7 @@ public String[] call( CoreOptions.fromMap(map), fileStore.partitionType()), fileStore.newScan(), fileStore.newCommit(""), - Optional.ofNullable( - fileStoreTable - .catalogEnvironment() - .metastoreClientFactory()) - .map(MetastoreClient.Factory::create) - .orElse(null), + fileStoreTable.catalogEnvironment().partitionHandler(), fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { partitionExpire.withMaxExpireNum(maxExpires); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java index 0dc96bd93a7e..7448d8f4d3a5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java @@ -20,7 +20,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.TimeUtils; @@ -28,7 +27,6 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; @@ -66,12 +64,7 @@ public ExpirePartitionsAction( CoreOptions.fromMap(map), fileStore.partitionType()), fileStore.newScan(), fileStore.newCommit(""), - Optional.ofNullable( - fileStoreTable - .catalogEnvironment() - .metastoreClientFactory()) - .map(MetastoreClient.Factory::create) - .orElse(null), + fileStoreTable.catalogEnvironment().partitionHandler(), fileStore.options().partitionExpireMaxNum()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index ce282c6800cc..b9435e12ed44 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -21,7 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.TimeUtils; @@ -36,7 +35,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; @@ -92,12 +90,7 @@ public String identifier() { CoreOptions.fromMap(map), fileStore.partitionType()), fileStore.newScan(), fileStore.newCommit(""), - Optional.ofNullable( - fileStoreTable - .catalogEnvironment() - .metastoreClientFactory()) - .map(MetastoreClient.Factory::create) - .orElse(null), + fileStoreTable.catalogEnvironment().partitionHandler(), fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { partitionExpire.withMaxExpireNum(maxExpires); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java index 84542af4768b..f4cd8cfc5300 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java @@ -21,9 +21,9 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.fs.Path; import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.partition.Partition; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.snapshot.SnapshotReader; @@ -36,6 +36,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -46,13 +47,14 @@ public class PartitionStatisticsReporter implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(PartitionStatisticsReporter.class); - private final MetastoreClient metastoreClient; + private final PartitionHandler partitionHandler; private final SnapshotReader snapshotReader; private final SnapshotManager snapshotManager; - public PartitionStatisticsReporter(FileStoreTable table, MetastoreClient client) { - this.metastoreClient = - Preconditions.checkNotNull(client, "the metastore client factory is null"); + public PartitionStatisticsReporter(FileStoreTable table, PartitionHandler partitionHandler) { + this.partitionHandler = + Preconditions.checkNotNull( + partitionHandler, "the partition handler factory is null"); this.snapshotReader = table.newSnapshotReader(); this.snapshotManager = table.snapshotManager(); } @@ -85,14 +87,14 @@ public void report(String partition, long modifyTimeMillis) throws Exception { Partition partitionStats = new Partition(partitionSpec, fileCount, totalSize, rowCount, modifyTimeMillis); LOG.info("alter partition {} with statistic {}.", partitionSpec, partitionStats); - metastoreClient.alterPartition(partitionStats); + partitionHandler.alterPartitions(Collections.singletonList(partitionStats)); } } @Override public void close() throws IOException { try { - metastoreClient.close(); + partitionHandler.close(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java index ca51c3df5b1a..d8b24adde719 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java @@ -23,6 +23,7 @@ import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.InternalRowPartitionComputer; @@ -157,7 +158,9 @@ public static Optional create( return Optional.empty(); } - if (table.catalogEnvironment().metastoreClientFactory() == null) { + PartitionHandler partitionHandler = table.catalogEnvironment().partitionHandler(); + + if (partitionHandler == null) { return Optional.empty(); } @@ -171,9 +174,7 @@ public static Optional create( return Optional.of( new ReportPartStatsListener( partitionComputer, - new PartitionStatisticsReporter( - table, - table.catalogEnvironment().metastoreClientFactory().create()), + new PartitionStatisticsReporter(table, partitionHandler), stateStore, isRestored, options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java index 3c5cd2f8e927..f913a9a5a91f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java @@ -18,15 +18,17 @@ package org.apache.paimon.flink.sink.partition; -import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.actions.AddDonePartitionAction; +import org.apache.paimon.table.PartitionHandler; import org.junit.jupiter.api.Test; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,46 +41,38 @@ class AddDonePartitionActionTest { public void test() throws Exception { AtomicBoolean closed = new AtomicBoolean(false); Set donePartitions = new HashSet<>(); - MetastoreClient metastoreClient = - new MetastoreClient() { + PartitionHandler partitionHandler = + new PartitionHandler() { @Override - public void addPartition(LinkedHashMap partition) { - donePartitions.add(generatePartitionPath(partition)); - } - - @Override - public void addPartitions(List> partitions) { - partitions.forEach(this::addPartition); - } - - @Override - public void dropPartition(LinkedHashMap partition) { - throw new UnsupportedOperationException(); + public void close() throws Exception { + closed.set(true); } @Override - public void dropPartitions(List> partitions) { - throw new UnsupportedOperationException(); + public void createPartitions(List> partitions) + throws Catalog.TableNotExistException { + partitions.forEach( + partition -> + donePartitions.add( + generatePartitionPath( + new LinkedHashMap<>(partition)))); } @Override - public void markPartitionDone(LinkedHashMap partitions) { - throw new UnsupportedOperationException(); - } + public void dropPartitions(List> partitions) + throws Catalog.TableNotExistException {} @Override - public void alterPartition(Partition partition) { - throw new UnsupportedOperationException(); - } + public void alterPartitions(List partitions) + throws Catalog.TableNotExistException {} @Override - public void close() { - closed.set(true); - } + public void markDonePartitions(List> partitions) + throws Catalog.TableNotExistException {} }; - AddDonePartitionAction action = new AddDonePartitionAction(metastoreClient); + AddDonePartitionAction action = new AddDonePartitionAction(partitionHandler); // test normal action.markDone("dt=20201202"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java index 3c01772d6d3b..c888d6eda062 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java @@ -22,12 +22,12 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.PartitionHandler; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.CommitMessage; @@ -43,7 +43,6 @@ import org.junit.jupiter.api.io.TempDir; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -87,41 +86,34 @@ public void testReportAction() throws Exception { AtomicBoolean closed = new AtomicBoolean(false); Map partitionParams = Maps.newHashMap(); - MetastoreClient client = - new MetastoreClient() { + PartitionHandler partitionHandler = + new PartitionHandler() { @Override - public void addPartition(LinkedHashMap partition) { + public void createPartitions(List> partitions) { throw new UnsupportedOperationException(); } @Override - public void addPartitions(List> partitions) { + public void dropPartitions(List> partitions) { throw new UnsupportedOperationException(); } @Override - public void dropPartition(LinkedHashMap partition) { + public void markDonePartitions(List> partitions) { throw new UnsupportedOperationException(); } @Override - public void dropPartitions(List> partitions) { - throw new UnsupportedOperationException(); - } - - @Override - public void markPartitionDone(LinkedHashMap partitionSpec) { - throw new UnsupportedOperationException(); - } - - @Override - public void alterPartition(Partition partition) { - partitionParams.put( - PartitionPathUtils.generatePartitionPath( - partition.spec(), - table.rowType().project(table.partitionKeys())), - partition); + public void alterPartitions(List partitions) { + partitions.forEach( + partition -> { + partitionParams.put( + PartitionPathUtils.generatePartitionPath( + partition.spec(), + table.rowType().project(table.partitionKeys())), + partition); + }); } @Override @@ -130,7 +122,8 @@ public void close() { } }; - PartitionStatisticsReporter action = new PartitionStatisticsReporter(table, client); + PartitionStatisticsReporter action = + new PartitionStatisticsReporter(table, partitionHandler); long time = 1729598544974L; action.report("c1=a/", time); Assertions.assertThat(partitionParams).containsKey("c1=a/"); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 5afb60e84f82..64bdc8140b71 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -23,6 +23,7 @@ import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.CatalogLockContext; import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Identifier; @@ -31,7 +32,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.hive.pool.CachedClientPool; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; @@ -68,6 +68,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -204,15 +205,6 @@ public Optional lockContext() { new SerializableHiveConf(hiveConf), clientClassName, catalogOptions)); } - @Override - public Optional metastoreClientFactory(Identifier identifier) { - Identifier tableIdentifier = - new Identifier(identifier.getDatabaseName(), identifier.getTableName()); - return Optional.of( - new HiveMetastoreClient.Factory( - tableIdentifier, hiveConf, clientClassName, options)); - } - @Override public Path getTableLocation(Identifier identifier) { Table table = null; @@ -369,15 +361,31 @@ public void createPartitions(Identifier identifier, List> pa } int currentTime = (int) (System.currentTimeMillis() / 1000); - + StorageDescriptor sd = hmsTable.getSd(); + String dataFilePath = + hmsTable.getParameters().containsKey(DATA_FILE_PATH_DIRECTORY.key()) + ? sd.getLocation() + + "/" + + hmsTable.getParameters().get(DATA_FILE_PATH_DIRECTORY.key()) + : sd.getLocation(); + List hivePartitions = new ArrayList<>(); + for (Map partitionSpec : partitions) { + Partition hivePartition = new Partition(); + StorageDescriptor newSd = new StorageDescriptor(sd); + newSd.setLocation( + dataFilePath + + "/" + + PartitionPathUtils.generatePartitionPath( + new LinkedHashMap<>(partitionSpec))); + hivePartition.setDbName(identifier.getDatabaseName()); + hivePartition.setTableName(identifier.getTableName()); + hivePartition.setValues(new ArrayList<>(partitionSpec.values())); + hivePartition.setSd(newSd); + hivePartition.setCreateTime(currentTime); + hivePartition.setLastAccessTime(currentTime); + hivePartitions.add(hivePartition); + } try { - List hivePartitions = - toHivePartitions( - identifier, - location.toString(), - hmsTable.getSd(), - partitions, - currentTime); clients.execute(client -> client.add_partitions(hivePartitions, true, false)); } catch (Exception e) { throw new RuntimeException(e); @@ -469,6 +477,27 @@ && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { } } + @Override + public void markDonePartitions(Identifier identifier, List> partitions) + throws TableNotExistException { + try { + clients.execute( + client -> { + for (Map partition : partitions) { + client.markPartitionForEvent( + identifier.getDatabaseName(), + identifier.getTableName(), + partition, + PartitionEventType.LOAD_DONE); + } + }); + } catch (NoSuchObjectException e) { + // do nothing if the partition not exists + } catch (TException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @Override public List listPartitions(Identifier identifier) throws TableNotExistException { @@ -809,7 +838,7 @@ public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier) lockFactory().orElse(null), lockContext().orElse(null), identifier), - metastoreClientFactory(identifier).orElse(null))); + catalogLoader())); } catch (TableNotExistException ignore) { } @@ -1179,9 +1208,8 @@ public void repairTable(Identifier identifier) throws TableNotExistException { tableSchema.logicalPartitionType(), tableSchema.partitionKeys().toArray(new String[0]), options.legacyPartitionName()); - @SuppressWarnings("resource") - HiveMetastoreClient metastoreClient = new HiveMetastoreClient(identifier, clients); - metastoreClient.addPartitions( + createPartitions( + identifier, getTable(identifier).newReadBuilder().newScan().listPartitions().stream() .map(partitionComputer::generatePartValues) .collect(Collectors.toList())); @@ -1201,6 +1229,12 @@ public String warehouse() { return warehouse; } + @Override + public CatalogLoader catalogLoader() { + return new HiveCatalogLoader( + fileIO, new SerializableHiveConf(hiveConf), clientClassName, options, warehouse); + } + public Table getHmsTable(Identifier identifier) throws TableNotExistException { try { return clients.run( @@ -1607,30 +1641,4 @@ public int getBatchGetTableSize() { return DEFAULT_TABLE_BATCH_SIZE; } } - - private List toHivePartitions( - Identifier identifier, - String tablePath, - StorageDescriptor sd, - List> partitions, - int currentTime) { - List hivePartitions = new ArrayList<>(); - for (Map partitionSpec : partitions) { - Partition hivePartition = new Partition(); - StorageDescriptor newSd = new StorageDescriptor(sd); - newSd.setLocation( - tablePath - + "/" - + PartitionPathUtils.generatePartitionPath( - new LinkedHashMap<>(partitionSpec))); - hivePartition.setDbName(identifier.getDatabaseName()); - hivePartition.setTableName(identifier.getTableName()); - hivePartition.setValues(new ArrayList<>(partitionSpec.values())); - hivePartition.setSd(newSd); - hivePartition.setCreateTime(currentTime); - hivePartition.setLastAccessTime(currentTime); - hivePartitions.add(hivePartition); - } - return hivePartitions; - } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java new file mode 100644 index 000000000000..ff389434d41a --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogLoader.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.options.Options; + +/** Loader to create {@link HiveCatalog}. */ +public class HiveCatalogLoader implements CatalogLoader { + + private static final long serialVersionUID = 1L; + + private final FileIO fileIO; + private final SerializableHiveConf hiveConf; + private final String clientClassName; + private final Options options; + private final String warehouse; + + public HiveCatalogLoader( + FileIO fileIO, + SerializableHiveConf hiveConf, + String clientClassName, + Options options, + String warehouse) { + this.fileIO = fileIO; + this.hiveConf = hiveConf; + this.clientClassName = clientClassName; + this.options = options; + this.warehouse = warehouse; + } + + @Override + public Catalog load() { + return new HiveCatalog(fileIO, hiveConf.conf(), clientClassName, options, warehouse); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java deleted file mode 100644 index 755b2df2069f..000000000000 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.hive; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.client.ClientPool; -import org.apache.paimon.hive.pool.CachedClientPool; -import org.apache.paimon.metastore.MetastoreClient; -import org.apache.paimon.options.Options; -import org.apache.paimon.utils.PartitionPathUtils; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PartitionEventType; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.thrift.TException; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP; -import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP; -import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP; -import static org.apache.paimon.catalog.Catalog.TOTAL_SIZE_PROP; - -/** {@link MetastoreClient} for Hive tables. */ -public class HiveMetastoreClient implements MetastoreClient { - - private static final String HIVE_LAST_UPDATE_TIME_PROP = "transient_lastDdlTime"; - - private final Identifier identifier; - - private final ClientPool clients; - private final List partitionKeys; - private final StorageDescriptor sd; - private final String dataFilePath; - - HiveMetastoreClient(Identifier identifier, ClientPool clients) - throws TException, InterruptedException { - this.identifier = identifier; - this.clients = clients; - Table table = - this.clients.run( - client -> - client.getTable( - identifier.getDatabaseName(), identifier.getTableName())); - this.partitionKeys = - table.getPartitionKeys().stream() - .map(FieldSchema::getName) - .collect(Collectors.toList()); - this.sd = table.getSd(); - this.dataFilePath = - table.getParameters().containsKey(CoreOptions.DATA_FILE_PATH_DIRECTORY.key()) - ? sd.getLocation() - + "/" - + table.getParameters() - .get(CoreOptions.DATA_FILE_PATH_DIRECTORY.key()) - : sd.getLocation(); - } - - @Override - public void addPartition(LinkedHashMap partition) throws Exception { - Partition hivePartition = - toHivePartition(partition, (int) (System.currentTimeMillis() / 1000)); - clients.execute( - client -> { - try { - client.add_partition(hivePartition); - } catch (AlreadyExistsException ignore) { - } - }); - } - - @Override - public void addPartitions(List> partitions) throws Exception { - int currentTime = (int) (System.currentTimeMillis() / 1000); - List hivePartitions = - partitions.stream() - .map(partitionSpec -> toHivePartition(partitionSpec, currentTime)) - .collect(Collectors.toList()); - clients.execute(client -> client.add_partitions(hivePartitions, true, false)); - } - - @Override - public void alterPartition(org.apache.paimon.partition.Partition partition) throws Exception { - Map spec = partition.spec(); - List partitionValues = - partitionKeys.stream().map(spec::get).collect(Collectors.toList()); - - Map statistic = new HashMap<>(); - statistic.put(NUM_FILES_PROP, String.valueOf(partition.fileCount())); - statistic.put(TOTAL_SIZE_PROP, String.valueOf(partition.fileSizeInBytes())); - statistic.put(NUM_ROWS_PROP, String.valueOf(partition.recordCount())); - - String modifyTimeSeconds = String.valueOf(partition.lastFileCreationTime() / 1000); - statistic.put(LAST_UPDATE_TIME_PROP, modifyTimeSeconds); - - // just for being compatible with hive metastore - statistic.put(HIVE_LAST_UPDATE_TIME_PROP, modifyTimeSeconds); - - try { - Partition hivePartition = - clients.run( - client -> - client.getPartition( - identifier.getDatabaseName(), - identifier.getObjectName(), - partitionValues)); - hivePartition.setValues(partitionValues); - hivePartition.setLastAccessTime((int) (partition.lastFileCreationTime() / 1000)); - hivePartition.getParameters().putAll(statistic); - clients.execute( - client -> - client.alter_partition( - identifier.getDatabaseName(), - identifier.getObjectName(), - hivePartition)); - } catch (NoSuchObjectException e) { - // do nothing if the partition not exists - } - } - - @Override - public void dropPartition(LinkedHashMap partitionSpec) throws Exception { - List partitionValues = new ArrayList<>(partitionSpec.values()); - try { - clients.execute( - client -> - client.dropPartition( - identifier.getDatabaseName(), - identifier.getTableName(), - partitionValues, - false)); - } catch (NoSuchObjectException e) { - // do nothing if the partition not exists - } - } - - @Override - public void dropPartitions(List> partitions) throws Exception { - for (LinkedHashMap partition : partitions) { - dropPartition(partition); - } - } - - @Override - public void markPartitionDone(LinkedHashMap partitionSpec) throws Exception { - try { - clients.execute( - client -> - client.markPartitionForEvent( - identifier.getDatabaseName(), - identifier.getTableName(), - partitionSpec, - PartitionEventType.LOAD_DONE)); - } catch (NoSuchObjectException e) { - // do nothing if the partition not exists - } - } - - @Override - public void close() throws Exception { - // do nothing - } - - public IMetaStoreClient client() throws TException, InterruptedException { - return clients.run(client -> client); - } - - private Partition toHivePartition( - LinkedHashMap partitionSpec, int currentTime) { - Partition hivePartition = new Partition(); - StorageDescriptor newSd = new StorageDescriptor(sd); - newSd.setLocation( - dataFilePath + "/" + PartitionPathUtils.generatePartitionPath(partitionSpec)); - hivePartition.setDbName(identifier.getDatabaseName()); - hivePartition.setTableName(identifier.getTableName()); - hivePartition.setValues(new ArrayList<>(partitionSpec.values())); - hivePartition.setSd(newSd); - hivePartition.setCreateTime(currentTime); - hivePartition.setLastAccessTime(currentTime); - return hivePartition; - } - - /** Factory to create {@link HiveMetastoreClient}. */ - public static class Factory implements MetastoreClient.Factory { - - private static final long serialVersionUID = 1L; - - private final Identifier identifier; - private final SerializableHiveConf hiveConf; - private final String clientClassName; - private final Options options; - - public Factory( - Identifier identifier, HiveConf hiveConf, String clientClassName, Options options) { - this.identifier = identifier; - this.hiveConf = new SerializableHiveConf(hiveConf); - this.clientClassName = clientClassName; - this.options = options; - } - - @Override - public MetastoreClient create() { - HiveConf conf = hiveConf.conf(); - try { - return new HiveMetastoreClient( - identifier, new CachedClientPool(conf, options, clientClassName)); - } catch (TException e) { - throw new RuntimeException( - "Can not get table " + identifier + " info from metastore.", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException( - "Interrupted in call to new HiveMetastoreClient for table " + identifier, - e); - } - } - } -} diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index c39b85cb3d22..e890d0ebf2ad 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -19,11 +19,11 @@ package org.apache.paimon.hive; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.DelegateCatalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.hive.annotation.Minio; import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.Lock; import org.apache.paimon.privilege.NoPrivilegeException; import org.apache.paimon.s3.MinioTestContainer; @@ -1513,11 +1513,11 @@ public void testMarkDone() throws Exception { Identifier identifier = new Identifier("test_db", "mark_done_t2"); Table table = catalog.getTable(identifier); assertThat(table).isInstanceOf(FileStoreTable.class); - FileStoreTable fileStoreTable = (FileStoreTable) table; - MetastoreClient.Factory metastoreClientFactory = - fileStoreTable.catalogEnvironment().metastoreClientFactory(); - HiveMetastoreClient metastoreClient = (HiveMetastoreClient) metastoreClientFactory.create(); - IMetaStoreClient hmsClient = metastoreClient.client(); + while (catalog instanceof DelegateCatalog) { + catalog = ((DelegateCatalog) catalog).wrapped(); + } + HiveCatalog hiveCatalog = (HiveCatalog) catalog; + IMetaStoreClient hmsClient = hiveCatalog.getHmsClient(); Map partitionSpec = Collections.singletonMap("dt", "20240501"); // LOAD_DONE event is not marked by now. assertThat( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java index e3a53d2bd2ef..4b9d50db8d94 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java @@ -20,7 +20,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; -import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.TimeUtils; @@ -37,7 +36,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import static org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy; import static org.apache.spark.sql.types.DataTypes.IntegerType; @@ -102,12 +100,7 @@ public InternalRow[] call(InternalRow args) { CoreOptions.fromMap(map), fileStore.partitionType()), fileStore.newScan(), fileStore.newCommit(""), - Optional.ofNullable( - fileStoreTable - .catalogEnvironment() - .metastoreClientFactory()) - .map(MetastoreClient.Factory::create) - .orElse(null), + fileStoreTable.catalogEnvironment().partitionHandler(), fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { partitionExpire.withMaxExpireNum(maxExpires); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index c385f243ae66..5a6abfe2849d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -19,9 +19,7 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions -import org.apache.paimon.metastore.MetastoreClient import org.apache.paimon.operation.FileStoreCommit -import org.apache.paimon.spark.data.SparkInternalRow import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.BatchWriteBuilder import org.apache.paimon.types.RowType @@ -44,8 +42,7 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { override lazy val partitionSchema: StructType = SparkTypeUtils.fromPaimonRowType(partitionRowType) - private def toPaimonPartitions( - rows: Array[InternalRow]): Array[java.util.LinkedHashMap[String, String]] = { + private def toPaimonPartitions(rows: Array[InternalRow]): Array[java.util.Map[String, String]] = { table match { case fileStoreTable: FileStoreTable => val rowConverter = CatalystTypeConverters @@ -69,21 +66,20 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { override def dropPartitions(rows: Array[InternalRow]): Boolean = { table match { case fileStoreTable: FileStoreTable => - val partitions = toPaimonPartitions(rows).map(_.asInstanceOf[JMap[String, String]]) - val commit: FileStoreCommit = fileStoreTable.store.newCommit(UUID.randomUUID.toString) - var metastoreClient: MetastoreClient = null - val clientFactory = fileStoreTable.catalogEnvironment().metastoreClientFactory - try { - commit.dropPartitions(partitions.toSeq.asJava, BatchWriteBuilder.COMMIT_IDENTIFIER) - // sync to metastore with delete partitions - if (clientFactory != null && fileStoreTable.coreOptions().partitionedTableInMetastore()) { - metastoreClient = clientFactory.create() - metastoreClient.dropPartitions(toPaimonPartitions(rows).toSeq.asJava) + val partitions = toPaimonPartitions(rows).toSeq.asJava + val partitionHandler = fileStoreTable.catalogEnvironment().partitionHandler() + if (partitionHandler != null) { + try { + partitionHandler.dropPartitions(partitions) + } finally { + partitionHandler.close() } - } finally { - commit.close() - if (metastoreClient != null) { - metastoreClient.close() + } else { + val commit: FileStoreCommit = fileStoreTable.store.newCommit(UUID.randomUUID.toString) + try { + commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER) + } finally { + commit.close() } } true @@ -140,18 +136,17 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { table match { case fileStoreTable: FileStoreTable => val partitions = toPaimonPartitions(rows) - val metastoreFactory = fileStoreTable.catalogEnvironment().metastoreClientFactory() - if (metastoreFactory == null) { + val partitionHandler = fileStoreTable.catalogEnvironment().partitionHandler() + if (partitionHandler == null) { throw new UnsupportedOperationException( "The table must have metastore to create partition.") } - val metastoreClient: MetastoreClient = metastoreFactory.create try { if (fileStoreTable.coreOptions().partitionedTableInMetastore()) { - partitions.foreach(metastoreClient.addPartition) + partitionHandler.createPartitions(partitions.toSeq.asJava) } } finally { - metastoreClient.close() + partitionHandler.close() } case _ => throw new UnsupportedOperationException("Only FileStoreTable supports create partitions.") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 179020ae5b74..b90fe8654925 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -18,8 +18,9 @@ package org.apache.paimon.spark.sql +import org.apache.paimon.catalog.DelegateCatalog import org.apache.paimon.fs.Path -import org.apache.paimon.hive.HiveMetastoreClient +import org.apache.paimon.hive.HiveCatalog import org.apache.paimon.spark.PaimonHiveTestBase import org.apache.paimon.table.FileStoreTable @@ -315,12 +316,6 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { |""".stripMargin) val table = loadTable(dbName, tblName) - val metastoreClient = table - .catalogEnvironment() - .metastoreClientFactory() - .create() - .asInstanceOf[HiveMetastoreClient] - .client() val fileIO = table.fileIO() def containsDir(root: Path, targets: Array[String]): Boolean = { @@ -333,7 +328,12 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { spark.sql(s"show partitions $tblName"), Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"))) // check partitions in HMS - assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3) + var catalog = paimonCatalog + while (catalog.isInstanceOf[DelegateCatalog]) { + catalog = catalog.asInstanceOf[DelegateCatalog].wrapped() + } + val hmsClient = catalog.asInstanceOf[HiveCatalog].getHmsClient + assert(hmsClient.listPartitions(dbName, tblName, 100).size() == 3) // check partitions in filesystem if (dataFilePathDir.isEmpty) { assert(containsDir(table.location(), Array("pt=1", "pt=2", "pt=3"))) @@ -347,13 +347,13 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { checkAnswer( spark.sql(s"show partitions $tblName"), Seq(Row("pt=1"), Row("pt=2"), Row("pt=3"), Row("pt=4"))) - assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 4) + assert(hmsClient.listPartitions(dbName, tblName, 100).size() == 4) spark.sql(s"ALTER TABLE $tblName DROP PARTITION (pt=1)") checkAnswer( spark.sql(s"show partitions $tblName"), Seq(Row("pt=2"), Row("pt=3"), Row("pt=4"))) - assert(metastoreClient.listPartitions(dbName, tblName, 100).size() == 3) + assert(hmsClient.listPartitions(dbName, tblName, 100).size() == 3) } } } From 0b813b29cda365741090467b2ef5290ed31fa511 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 9 Jan 2025 14:00:45 +0800 Subject: [PATCH 2/3] fix comment --- .../main/java/org/apache/paimon/catalog/CatalogLoader.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java index 0f75a313d7c1..9f68affbfe36 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogLoader.java @@ -22,7 +22,11 @@ import java.io.Serializable; -/** Loader for creating a {@link Catalog}. */ +/** + * Loader for creating a {@link Catalog}. + * + * @since 1.1.0 + */ @Public @FunctionalInterface public interface CatalogLoader extends Serializable { From 87706a00383c0be9758ae077e38a1cd24f185ca3 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 9 Jan 2025 14:44:55 +0800 Subject: [PATCH 3/3] Introduce RESTCatalogLoader --- .../org/apache/paimon/rest/HttpClient.java | 27 +++++--- .../apache/paimon/rest/HttpClientOptions.java | 40 ++++++------ .../org/apache/paimon/rest/RESTCatalog.java | 63 +++++++------------ .../apache/paimon/rest/RESTCatalogLoader.java | 43 +++++++++++++ .../java/org/apache/paimon/rest/RESTUtil.java | 6 ++ .../org/apache/paimon/rest/ResourcePaths.java | 6 +- .../apache/paimon/rest/auth/AuthSession.java | 19 ++++++ .../apache/paimon/rest/HttpClientTest.java | 18 +++--- 8 files changed, 137 insertions(+), 85 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java index d92cab510201..08284fc454b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java @@ -18,6 +18,8 @@ package org.apache.paimon.rest; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.options.Options; import org.apache.paimon.rest.exceptions.RESTException; import org.apache.paimon.rest.responses.ErrorResponse; @@ -47,19 +49,28 @@ /** HTTP client for REST catalog. */ public class HttpClient implements RESTClient { + private static final ObjectMapper OBJECT_MAPPER = RESTObjectMapper.create(); + private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL"; + private static final MediaType MEDIA_TYPE = MediaType.parse("application/json"); + private final OkHttpClient okHttpClient; private final String uri; - private final ObjectMapper mapper; - private final ErrorHandler errorHandler; - private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL"; - private static final MediaType MEDIA_TYPE = MediaType.parse("application/json"); + private ErrorHandler errorHandler; + + public HttpClient(Options options) { + this(HttpClientOptions.create(options)); + } public HttpClient(HttpClientOptions httpClientOptions) { this.uri = httpClientOptions.uri(); - this.mapper = httpClientOptions.mapper(); this.okHttpClient = createHttpClient(httpClientOptions); - this.errorHandler = httpClientOptions.errorHandler(); + this.errorHandler = DefaultErrorHandler.getInstance(); + } + + @VisibleForTesting + void setErrorHandler(ErrorHandler errorHandler) { + this.errorHandler = errorHandler; } @Override @@ -128,7 +139,7 @@ private T exec(Request request, Class responseType) errorHandler.accept(error); } if (responseType != null && responseBodyStr != null) { - return mapper.readValue(responseBodyStr, responseType); + return OBJECT_MAPPER.readValue(responseBodyStr, responseType); } else if (responseType == null) { return null; } else { @@ -142,7 +153,7 @@ private T exec(Request request, Class responseType) } private RequestBody buildRequestBody(RESTRequest body) throws JsonProcessingException { - return RequestBody.create(mapper.writeValueAsBytes(body), MEDIA_TYPE); + return RequestBody.create(OBJECT_MAPPER.writeValueAsBytes(body), MEDIA_TYPE); } private static OkHttpClient createHttpClient(HttpClientOptions httpClientOptions) { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java index 694779cfdb86..00ae1a529e89 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java @@ -18,7 +18,9 @@ package org.apache.paimon.rest; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.paimon.options.Options; + +import javax.annotation.Nullable; import java.time.Duration; import java.util.Optional; @@ -27,25 +29,27 @@ public class HttpClientOptions { private final String uri; - private final Optional connectTimeout; - private final Optional readTimeout; - private final ObjectMapper mapper; + @Nullable private final Duration connectTimeout; + @Nullable private final Duration readTimeout; private final int threadPoolSize; - private final ErrorHandler errorHandler; public HttpClientOptions( String uri, - Optional connectTimeout, - Optional readTimeout, - ObjectMapper mapper, - int threadPoolSize, - ErrorHandler errorHandler) { + @Nullable Duration connectTimeout, + @Nullable Duration readTimeout, + int threadPoolSize) { this.uri = uri; this.connectTimeout = connectTimeout; this.readTimeout = readTimeout; - this.mapper = mapper; this.threadPoolSize = threadPoolSize; - this.errorHandler = errorHandler; + } + + public static HttpClientOptions create(Options options) { + return new HttpClientOptions( + options.get(RESTCatalogOptions.URI), + options.get(RESTCatalogOptions.CONNECTION_TIMEOUT), + options.get(RESTCatalogOptions.READ_TIMEOUT), + options.get(RESTCatalogOptions.THREAD_POOL_SIZE)); } public String uri() { @@ -53,22 +57,14 @@ public String uri() { } public Optional connectTimeout() { - return connectTimeout; + return Optional.ofNullable(connectTimeout); } public Optional readTimeout() { - return readTimeout; - } - - public ObjectMapper mapper() { - return mapper; + return Optional.ofNullable(readTimeout); } public int threadPoolSize() { return threadPoolSize; } - - public ErrorHandler errorHandler() { - return errorHandler; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 4c8ac03bb26e..2c36f75a3713 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -34,8 +34,6 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.auth.AuthSession; -import org.apache.paimon.rest.auth.CredentialsProvider; -import org.apache.paimon.rest.auth.CredentialsProviderFactory; import org.apache.paimon.rest.exceptions.AlreadyExistsException; import org.apache.paimon.rest.exceptions.ForbiddenException; import org.apache.paimon.rest.exceptions.NoSuchResourceException; @@ -64,18 +62,15 @@ import org.apache.paimon.utils.Preconditions; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; @@ -84,6 +79,8 @@ import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.rest.RESTUtil.extractPrefixMap; +import static org.apache.paimon.rest.auth.AuthSession.createAuthSession; import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; @@ -91,7 +88,7 @@ public class RESTCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(RESTCatalog.class); - private static final ObjectMapper OBJECT_MAPPER = RESTObjectMapper.create(); + public static final String HEADER_PREFIX = "header."; private final RESTClient client; private final ResourcePaths resourcePaths; @@ -105,42 +102,18 @@ public RESTCatalog(CatalogContext context) { if (context.options().getOptional(CatalogOptions.WAREHOUSE).isPresent()) { throw new IllegalArgumentException("Can not config warehouse in RESTCatalog."); } - String uri = context.options().get(RESTCatalogOptions.URI); - Optional connectTimeout = - context.options().getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT); - Optional readTimeout = - context.options().getOptional(RESTCatalogOptions.READ_TIMEOUT); - Integer threadPoolSize = context.options().get(RESTCatalogOptions.THREAD_POOL_SIZE); - HttpClientOptions httpClientOptions = - new HttpClientOptions( - uri, - connectTimeout, - readTimeout, - OBJECT_MAPPER, - threadPoolSize, - DefaultErrorHandler.getInstance()); - this.client = new HttpClient(httpClientOptions); - Map baseHeader = configHeaders(context.options().toMap()); - CredentialsProvider credentialsProvider = - CredentialsProviderFactory.createCredentialsProvider( - context.options(), RESTCatalog.class.getClassLoader()); - if (credentialsProvider.keepRefreshed()) { - this.catalogAuth = - AuthSession.fromRefreshCredentialsProvider( - tokenRefreshExecutor(), baseHeader, credentialsProvider); - } else { - this.catalogAuth = new AuthSession(baseHeader, credentialsProvider); - } + this.client = new HttpClient(context.options()); + this.catalogAuth = createAuthSession(context.options(), tokenRefreshExecutor()); + Map initHeaders = RESTUtil.merge( - configHeaders(context.options().toMap()), this.catalogAuth.getHeaders()); - + extractPrefixMap(context.options(), HEADER_PREFIX), + catalogAuth.getHeaders()); this.options = new Options( client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, initHeaders) .merge(context.options().toMap())); - this.resourcePaths = - ResourcePaths.forCatalogProperties(options.get(RESTCatalogInternalOptions.PREFIX)); + this.resourcePaths = ResourcePaths.forCatalogProperties(options); try { String warehouseStr = options.get(CatalogOptions.WAREHOUSE); @@ -155,6 +128,14 @@ public RESTCatalog(CatalogContext context) { } } + protected RESTCatalog(Options options, FileIO fileIO) { + this.client = new HttpClient(options); + this.catalogAuth = createAuthSession(options, tokenRefreshExecutor()); + this.options = options; + this.resourcePaths = ResourcePaths.forCatalogProperties(options); + this.fileIO = fileIO; + } + @Override public String warehouse() { return options.get(CatalogOptions.WAREHOUSE); @@ -167,7 +148,7 @@ public Map options() { @Override public CatalogLoader catalogLoader() { - throw new UnsupportedOperationException("Not supported yet."); + return new RESTCatalogLoader(options, fileIO); } @Override @@ -452,7 +433,9 @@ private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistEx fileIO(), new Path(response.getPath()), TableSchema.create(response.getSchemaId(), response.getSchema()), - new CatalogEnvironment(identifier, null, Lock.emptyFactory(), null)); + // TODO add uuid from server + new CatalogEnvironment( + identifier, null, Lock.emptyFactory(), catalogLoader())); CoreOptions options = table.coreOptions(); if (options.type() == TableType.OBJECT_TABLE) { String objectLocation = options.objectLocation(); @@ -467,10 +450,6 @@ private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistEx return table; } - private static Map configHeaders(Map properties) { - return RESTUtil.extractPrefixMap(properties, "header."); - } - private Map headers() { return catalogAuth.getHeaders(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java new file mode 100644 index 000000000000..b4c5171ab91f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogLoader; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.options.Options; + +/** Loader to create {@link RESTCatalog}. */ +public class RESTCatalogLoader implements CatalogLoader { + + private static final long serialVersionUID = 1L; + + private final Options options; + private final FileIO fileIO; + + public RESTCatalogLoader(Options options, FileIO fileIO) { + this.options = options; + this.fileIO = fileIO; + } + + @Override + public Catalog load() { + return new RESTCatalog(options, fileIO); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTUtil.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTUtil.java index 3d42e99fa6d5..f957ae48afc9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTUtil.java @@ -18,6 +18,7 @@ package org.apache.paimon.rest; +import org.apache.paimon.options.Options; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; @@ -27,6 +28,11 @@ /** Util for REST. */ public class RESTUtil { + + public static Map extractPrefixMap(Options options, String prefix) { + return extractPrefixMap(options.toMap(), prefix); + } + public static Map extractPrefixMap( Map properties, String prefix) { Preconditions.checkNotNull(properties, "Invalid properties map: null"); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java index ebfdd2db1eec..780582c33cb7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -18,6 +18,8 @@ package org.apache.paimon.rest; +import org.apache.paimon.options.Options; + import java.util.StringJoiner; /** Resource paths for REST catalog. */ @@ -26,8 +28,8 @@ public class ResourcePaths { public static final String V1_CONFIG = "/v1/config"; private static final StringJoiner SLASH = new StringJoiner("/"); - public static ResourcePaths forCatalogProperties(String prefix) { - return new ResourcePaths(prefix); + public static ResourcePaths forCatalogProperties(Options options) { + return new ResourcePaths(options.get(RESTCatalogInternalOptions.PREFIX)); } private final String prefix; diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java index 3ca7590e5f96..198b098687d4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java @@ -19,6 +19,8 @@ package org.apache.paimon.rest.auth; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.options.Options; +import org.apache.paimon.rest.RESTCatalog; import org.apache.paimon.rest.RESTUtil; import org.slf4j.Logger; @@ -29,6 +31,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.paimon.rest.RESTCatalog.HEADER_PREFIX; +import static org.apache.paimon.rest.RESTUtil.extractPrefixMap; + /** Auth session. */ public class AuthSession { @@ -140,4 +145,18 @@ private static void scheduleTokenRefresh( log.warn("Failed to refresh token after {} retries.", TOKEN_REFRESH_NUM_RETRIES); } } + + public static AuthSession createAuthSession( + Options options, ScheduledExecutorService refreshExecutor) { + Map baseHeader = extractPrefixMap(options, HEADER_PREFIX); + CredentialsProvider credentialsProvider = + CredentialsProviderFactory.createCredentialsProvider( + options, RESTCatalog.class.getClassLoader()); + if (credentialsProvider.keepRefreshed()) { + return AuthSession.fromRefreshCredentialsProvider( + refreshExecutor, baseHeader, credentialsProvider); + } else { + return new AuthSession(baseHeader, credentialsProvider); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java index a3b06b8ce3a9..3baff1ccaa43 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java @@ -33,7 +33,6 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; @@ -44,15 +43,17 @@ /** Test for {@link HttpClient}. */ public class HttpClientTest { + private static final String MOCK_PATH = "/v1/api/mock"; + private static final String TOKEN = "token"; + + private final ObjectMapper objectMapper = RESTObjectMapper.create(); + private MockWebServer mockWebServer; private HttpClient httpClient; - private ObjectMapper objectMapper = RESTObjectMapper.create(); private ErrorHandler errorHandler; private MockRESTData mockResponseData; private String mockResponseDataStr; private Map headers; - private static final String MOCK_PATH = "/v1/api/mock"; - private static final String TOKEN = "token"; @Before public void setUp() throws IOException { @@ -61,16 +62,11 @@ public void setUp() throws IOException { String baseUrl = mockWebServer.url("").toString(); errorHandler = mock(ErrorHandler.class); HttpClientOptions httpClientOptions = - new HttpClientOptions( - baseUrl, - Optional.of(Duration.ofSeconds(3)), - Optional.of(Duration.ofSeconds(3)), - objectMapper, - 1, - errorHandler); + new HttpClientOptions(baseUrl, Duration.ofSeconds(3), Duration.ofSeconds(3), 1); mockResponseData = new MockRESTData(MOCK_PATH); mockResponseDataStr = objectMapper.writeValueAsString(mockResponseData); httpClient = new HttpClient(httpClientOptions); + httpClient.setErrorHandler(errorHandler); CredentialsProvider credentialsProvider = new BearTokenCredentialsProvider(TOKEN); headers = credentialsProvider.authHeader(); }