From 74b509ec46802b44db2440c0805a1a7f883730e6 Mon Sep 17 00:00:00 2001 From: yantian Date: Fri, 17 Jan 2025 17:56:08 +0800 Subject: [PATCH 1/7] support data token --- .../apache/paimon/options/CatalogOptions.java | 6 + .../paimon/catalog/AbstractCatalog.java | 3 +- .../apache/paimon/catalog/CatalogUtils.java | 11 +- .../org/apache/paimon/rest/RESTCatalog.java | 40 ++++- .../paimon/rest/RefreshCredentialFileIO.java | 146 ++++++++++++++++++ .../org/apache/paimon/rest/ResourcePaths.java | 4 + .../GetTableCredentialsResponse.java | 61 ++++++++ 7 files changed, 259 insertions(+), 12 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index d0cfbeaf39ed..643fb0ce19b3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -149,4 +149,10 @@ public class CatalogOptions { "Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. " + "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in" + " the metastore and need to be manually added as separate partition operations."); + + public static final ConfigOption FILE_IO_REFRESH_CREDENTIAL_ENABLE = + ConfigOptions.key("file-io-refresh-credential.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Whether to support file io refresh credential."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index aa93b1ba3256..aeeadc176579 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -369,7 +369,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException { SnapshotCommit.Factory commitFactory = new RenamingSnapshotCommit.Factory( lockFactory().orElse(null), lockContext().orElse(null)); - return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, commitFactory); + return CatalogUtils.loadTable( + this, this.fileIO(), identifier, this::loadTableMetadata, commitFactory); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index fbd510692c30..1915951c8452 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -170,6 +170,7 @@ public static List listPartitionsFromFileSystem(Table table) { */ public static Table loadTable( Catalog catalog, + FileIO fileIO, Identifier identifier, TableMetadata.Loader metadataLoader, SnapshotCommit.Factory commitFactory) @@ -189,11 +190,10 @@ public static Table loadTable( new CatalogEnvironment( identifier, metadata.uuid(), catalog.catalogLoader(), commitFactory); Path path = new Path(schema.options().get(PATH.key())); - FileStoreTable table = - FileStoreTableFactory.create(catalog.fileIO(), path, schema, catalogEnv); + FileStoreTable table = FileStoreTableFactory.create(fileIO, path, schema, catalogEnv); if (options.type() == TableType.OBJECT_TABLE) { - table = toObjectTable(catalog, table); + table = toObjectTable(fileIO, table); } if (identifier.isSystemTable()) { @@ -265,10 +265,11 @@ private static FormatTable toFormatTable(Identifier identifier, TableSchema sche .build(); } - private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable underlyingTable) { + private static ObjectTable toObjectTable(FileIO fileIO, FileStoreTable underlyingTable) { CoreOptions options = underlyingTable.coreOptions(); String objectLocation = options.objectLocation(); - FileIO objectFileIO = catalog.fileIO(new Path(objectLocation)); + // todo: check whether here is ok. + FileIO objectFileIO = fileIO; return ObjectTable.builder() .underlyingTable(underlyingTable) .objectLocation(objectLocation) diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 87b8e8cb9612..37d21cdf4361 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -109,6 +109,7 @@ public class RESTCatalog implements Catalog { private final ResourcePaths resourcePaths; private final AuthSession catalogAuth; private final Options options; + private final boolean fileIORefreshCredentialEnable; private final FileIO fileIO; private volatile ScheduledExecutorService refreshExecutor = null; @@ -130,13 +131,20 @@ public RESTCatalog(CatalogContext context) { .merge(context.options().toMap())); this.resourcePaths = ResourcePaths.forCatalogProperties(options); + this.fileIORefreshCredentialEnable = + options.get(CatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE); try { - String warehouseStr = options.get(CatalogOptions.WAREHOUSE); - this.fileIO = - FileIO.get( - new Path(warehouseStr), - CatalogContext.create( - options, context.preferIO(), context.fallbackIO())); + if (fileIORefreshCredentialEnable) { + // todo: check whether is ok + this.fileIO = null; + } else { + String warehouseStr = options.get(CatalogOptions.WAREHOUSE); + this.fileIO = + FileIO.get( + new Path(warehouseStr), + CatalogContext.create( + options, context.preferIO(), context.fallbackIO())); + } } catch (IOException e) { LOG.warn("Can not get FileIO from options."); throw new RuntimeException(e); @@ -149,6 +157,9 @@ protected RESTCatalog(Options options, FileIO fileIO) { this.options = options; this.resourcePaths = ResourcePaths.forCatalogProperties(options); this.fileIO = fileIO; + this.fileIORefreshCredentialEnable = + options.get(CatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE); + ; } @Override @@ -168,11 +179,27 @@ public RESTCatalogLoader catalogLoader() { @Override public FileIO fileIO() { + if (fileIORefreshCredentialEnable) { + throw new UnsupportedOperationException(); + } return fileIO; } + // todo: need cache table identifier location @Override public FileIO fileIO(Path path) { + if (fileIORefreshCredentialEnable) { + throw new UnsupportedOperationException(); + } + return fileIO; + } + + // todo: need cache table identifier fileIO + public FileIO fileIO(Identifier identifier) { + if (fileIORefreshCredentialEnable) { + return new RefreshCredentialFileIO( + resourcePaths, catalogAuth, options, client, identifier); + } return fileIO; } @@ -288,6 +315,7 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep public Table getTable(Identifier identifier) throws TableNotExistException { return CatalogUtils.loadTable( this, + this.fileIO(identifier), identifier, this::loadTableMetadata, new RESTSnapshotCommitFactory(catalogLoader())); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java b/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java new file mode 100644 index 000000000000..1f784bdc0c32 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileStatus; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; +import org.apache.paimon.rest.auth.AuthSession; +import org.apache.paimon.rest.responses.GetTableCredentialsResponse; + +import java.io.IOException; +import java.util.Date; +import java.util.Map; + +/** A {@link FileIO} to support refresh credential. */ +public class RefreshCredentialFileIO implements FileIO { + private final ResourcePaths resourcePaths; + private final AuthSession catalogAuth; + private RESTClient client; + protected Options options; + private Identifier identifier; + private Date expireAt; + private transient volatile FileIO lazyFileIO; + + public RefreshCredentialFileIO( + ResourcePaths resourcePaths, + AuthSession catalogAuth, + Options options, + RESTClient client, + Identifier identifier) { + this.resourcePaths = resourcePaths; + this.catalogAuth = catalogAuth; + this.options = options; + this.identifier = identifier; + this.client = client; + } + + @Override + public void configure(CatalogContext context) { + // Do not get Hadoop Configuration in CatalogOptions + // The class is in different classloader from pluginClassLoader! + this.options = context.options(); + } + + @Override + public SeekableInputStream newInputStream(Path path) throws IOException { + return fileIO().newInputStream(path); + } + + @Override + public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException { + return fileIO().newOutputStream(path, overwrite); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + return fileIO().getFileStatus(path); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + return fileIO().listStatus(path); + } + + @Override + public boolean exists(Path path) throws IOException { + return fileIO().exists(path); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + return fileIO().delete(path, recursive); + } + + @Override + public boolean mkdirs(Path path) throws IOException { + return fileIO().mkdirs(path); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return fileIO().rename(src, dst); + } + + @Override + public boolean isObjectStore() { + try { + return fileIO().isObjectStore(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private FileIO fileIO() throws IOException { + if (lazyFileIO == null || shouldRefresh()) { + synchronized (this) { + if (lazyFileIO == null || shouldRefresh()) { + GetTableCredentialsResponse response = getCredential(); + expireAt = response.getExpiresAt(); + Map conf = + RESTUtil.merge(options.toMap(), response.getCredential()); + Options updateCredentialOption = new Options(conf); + lazyFileIO = + FileIO.get( + new Path(updateCredentialOption.get(CatalogOptions.WAREHOUSE)), + CatalogContext.create(updateCredentialOption)); + } + } + } + return lazyFileIO; + } + + private GetTableCredentialsResponse getCredential() { + return client.get( + resourcePaths.tableCredentials( + identifier.getDatabaseName(), identifier.getObjectName()), + GetTableCredentialsResponse.class, + catalogAuth.getHeaders()); + } + + private boolean shouldRefresh() { + return expireAt != null && expireAt.getTime() > System.currentTimeMillis(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java index d77475fe40dc..1e843f99cb0e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -70,6 +70,10 @@ public String commitTable(String databaseName) { return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, "commit"); } + public String tableCredentials(String databaseName, String tableName) { + return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "credentials"); + } + public String partitions(String databaseName, String tableName) { return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions"); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java new file mode 100644 index 000000000000..d40e317de757 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.responses; + +import org.apache.paimon.rest.RESTResponse; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Date; +import java.util.Map; + +/** Response for table credentials. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class GetTableCredentialsResponse implements RESTResponse { + + private static final String FIELD_CREDENTIAL = "credential"; + private static final String FIELD_EXPIREAT = "expiresAt"; + + @JsonProperty(FIELD_CREDENTIAL) + private final Map credential; + + @JsonProperty(FIELD_EXPIREAT) + private Date expiresAt; + + @JsonCreator + public GetTableCredentialsResponse( + @JsonProperty(FIELD_EXPIREAT) Date expiresAt, + @JsonProperty(FIELD_CREDENTIAL) Map credential) { + this.expiresAt = expiresAt; + this.credential = credential; + } + + @JsonGetter(FIELD_CREDENTIAL) + public Map getCredential() { + return credential; + } + + @JsonGetter(FIELD_EXPIREAT) + public Date getExpiresAt() { + return expiresAt; + } +} From cb44d267b97d794f47b668ecc84d34ba66a0df28 Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 20 Jan 2025 10:22:05 +0800 Subject: [PATCH 2/7] add fileIO cache in RESTCatalog and fix some error define --- .../apache/paimon/options/CatalogOptions.java | 6 ---- .../org/apache/paimon/rest/RESTCatalog.java | 29 ++++++++++++++----- .../paimon/rest/RESTCatalogOptions.java | 6 ++++ .../paimon/rest/RefreshCredentialFileIO.java | 14 +++++---- 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index 643fb0ce19b3..d0cfbeaf39ed 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -149,10 +149,4 @@ public class CatalogOptions { "Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. " + "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in" + " the metastore and need to be manually added as separate partition operations."); - - public static final ConfigOption FILE_IO_REFRESH_CREDENTIAL_ENABLE = - ConfigOptions.key("file-io-refresh-credential.enabled") - .booleanType() - .defaultValue(false) - .withDescription("Whether to support file io refresh credential."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 37d21cdf4361..09ea4eb3dd58 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -73,6 +73,9 @@ import org.apache.paimon.view.ViewImpl; import org.apache.paimon.view.ViewSchema; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.slf4j.Logger; @@ -111,6 +114,7 @@ public class RESTCatalog implements Catalog { private final Options options; private final boolean fileIORefreshCredentialEnable; private final FileIO fileIO; + protected Cache tableFullName2FileIO; private volatile ScheduledExecutorService refreshExecutor = null; @@ -132,10 +136,15 @@ public RESTCatalog(CatalogContext context) { this.resourcePaths = ResourcePaths.forCatalogProperties(options); this.fileIORefreshCredentialEnable = - options.get(CatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE); + options.get(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE); try { if (fileIORefreshCredentialEnable) { - // todo: check whether is ok + tableFullName2FileIO = + Caffeine.newBuilder() + .softValues() + .executor(Runnable::run) + .ticker(Ticker.systemTicker()) + .build(); this.fileIO = null; } else { String warehouseStr = options.get(CatalogOptions.WAREHOUSE); @@ -158,8 +167,7 @@ protected RESTCatalog(Options options, FileIO fileIO) { this.resourcePaths = ResourcePaths.forCatalogProperties(options); this.fileIO = fileIO; this.fileIORefreshCredentialEnable = - options.get(CatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE); - ; + options.get(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE); } @Override @@ -185,20 +193,25 @@ public FileIO fileIO() { return fileIO; } - // todo: need cache table identifier location @Override public FileIO fileIO(Path path) { if (fileIORefreshCredentialEnable) { + // todo: check path's identifier and get FileIO throw new UnsupportedOperationException(); } return fileIO; } - // todo: need cache table identifier fileIO public FileIO fileIO(Identifier identifier) { if (fileIORefreshCredentialEnable) { - return new RefreshCredentialFileIO( - resourcePaths, catalogAuth, options, client, identifier); + FileIO tableFileIO = tableFullName2FileIO.getIfPresent(identifier.getFullName()); + if (tableFileIO != null) { + tableFileIO = + new RefreshCredentialFileIO( + resourcePaths, catalogAuth, options, client, identifier); + tableFullName2FileIO.put(identifier.getFullName(), tableFileIO); + return tableFileIO; + } } return fileIO; } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 843228fa0707..61aed5f7038f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -77,4 +77,10 @@ public class RESTCatalogOptions { .stringType() .noDefaultValue() .withDescription("REST Catalog auth token provider path."); + + public static final ConfigOption FILE_IO_REFRESH_CREDENTIAL_ENABLE = + ConfigOptions.key("file-io-refresh-credential.enabled") + .booleanType() + .defaultValue(false) + .withDescription("Whether to support file io refresh credential."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java b/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java index 1f784bdc0c32..ca9dbacc58c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java @@ -36,12 +36,16 @@ /** A {@link FileIO} to support refresh credential. */ public class RefreshCredentialFileIO implements FileIO { + + private static final long serialVersionUID = 1L; + private final ResourcePaths resourcePaths; private final AuthSession catalogAuth; - private RESTClient client; protected Options options; - private Identifier identifier; + private final Identifier identifier; + private final transient RESTClient client; private Date expireAt; + private Map credential; private transient volatile FileIO lazyFileIO; public RefreshCredentialFileIO( @@ -59,8 +63,6 @@ public RefreshCredentialFileIO( @Override public void configure(CatalogContext context) { - // Do not get Hadoop Configuration in CatalogOptions - // The class is in different classloader from pluginClassLoader! this.options = context.options(); } @@ -119,8 +121,8 @@ private FileIO fileIO() throws IOException { if (lazyFileIO == null || shouldRefresh()) { GetTableCredentialsResponse response = getCredential(); expireAt = response.getExpiresAt(); - Map conf = - RESTUtil.merge(options.toMap(), response.getCredential()); + credential = response.getCredential(); + Map conf = RESTUtil.merge(options.toMap(), credential); Options updateCredentialOption = new Options(conf); lazyFileIO = FileIO.get( From 183ba88cfd4e7f44450b9f3406f1151deb7456df Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 20 Jan 2025 11:10:42 +0800 Subject: [PATCH 3/7] fix get fileIO for object table --- .../paimon/catalog/AbstractCatalog.java | 5 ++-- .../org/apache/paimon/catalog/Catalog.java | 2 +- .../apache/paimon/catalog/CatalogUtils.java | 12 ++++----- .../paimon/catalog/DelegateCatalog.java | 4 +-- .../org/apache/paimon/rest/RESTCatalog.java | 25 ++++++------------- 5 files changed, 19 insertions(+), 29 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index aeeadc176579..3877e944fc78 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -92,7 +92,7 @@ public FileIO fileIO() { } @Override - public FileIO fileIO(Path path) { + public FileIO fileIO(Identifier identifier, Path path) { return fileIO; } @@ -369,8 +369,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException { SnapshotCommit.Factory commitFactory = new RenamingSnapshotCommit.Factory( lockFactory().orElse(null), lockContext().orElse(null)); - return CatalogUtils.loadTable( - this, this.fileIO(), identifier, this::loadTableMetadata, commitFactory); + return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, commitFactory); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index a0d78b268880..04448ef26064 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -376,7 +376,7 @@ default void repairTable(Identifier identifier) throws TableNotExistException { FileIO fileIO(); /** {@link FileIO} of this catalog. */ - FileIO fileIO(Path path); + FileIO fileIO(Identifier identifier, Path path); /** Catalog options for re-creating this catalog. */ Map options(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index 1915951c8452..00db34284102 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -170,7 +170,6 @@ public static List listPartitionsFromFileSystem(Table table) { */ public static Table loadTable( Catalog catalog, - FileIO fileIO, Identifier identifier, TableMetadata.Loader metadataLoader, SnapshotCommit.Factory commitFactory) @@ -190,10 +189,12 @@ public static Table loadTable( new CatalogEnvironment( identifier, metadata.uuid(), catalog.catalogLoader(), commitFactory); Path path = new Path(schema.options().get(PATH.key())); - FileStoreTable table = FileStoreTableFactory.create(fileIO, path, schema, catalogEnv); + FileStoreTable table = + FileStoreTableFactory.create( + catalog.fileIO(identifier, path), path, schema, catalogEnv); if (options.type() == TableType.OBJECT_TABLE) { - table = toObjectTable(fileIO, table); + table = toObjectTable(catalog, table); } if (identifier.isSystemTable()) { @@ -265,11 +266,10 @@ private static FormatTable toFormatTable(Identifier identifier, TableSchema sche .build(); } - private static ObjectTable toObjectTable(FileIO fileIO, FileStoreTable underlyingTable) { + private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable underlyingTable) { CoreOptions options = underlyingTable.coreOptions(); String objectLocation = options.objectLocation(); - // todo: check whether here is ok. - FileIO objectFileIO = fileIO; + FileIO objectFileIO = catalog.fileIO(new Path(objectLocation)); return ObjectTable.builder() .underlyingTable(underlyingTable) .objectLocation(objectLocation) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 847485a7a16b..5c12b89d1f14 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -63,8 +63,8 @@ public FileIO fileIO() { } @Override - public FileIO fileIO(Path path) { - return wrapped.fileIO(path); + public FileIO fileIO(Identifier identifier, Path path) { + return wrapped.fileIO(identifier, path); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 09ea4eb3dd58..2846e9530870 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -114,7 +114,7 @@ public class RESTCatalog implements Catalog { private final Options options; private final boolean fileIORefreshCredentialEnable; private final FileIO fileIO; - protected Cache tableFullName2FileIO; + protected Cache path2FileIO; private volatile ScheduledExecutorService refreshExecutor = null; @@ -139,7 +139,7 @@ public RESTCatalog(CatalogContext context) { options.get(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE); try { if (fileIORefreshCredentialEnable) { - tableFullName2FileIO = + path2FileIO = Caffeine.newBuilder() .softValues() .executor(Runnable::run) @@ -194,24 +194,16 @@ public FileIO fileIO() { } @Override - public FileIO fileIO(Path path) { + public FileIO fileIO(Identifier identifier, Path path) { if (fileIORefreshCredentialEnable) { - // todo: check path's identifier and get FileIO - throw new UnsupportedOperationException(); - } - return fileIO; - } - - public FileIO fileIO(Identifier identifier) { - if (fileIORefreshCredentialEnable) { - FileIO tableFileIO = tableFullName2FileIO.getIfPresent(identifier.getFullName()); - if (tableFileIO != null) { - tableFileIO = + FileIO pathFileIO = path2FileIO.getIfPresent(path); + if (pathFileIO == null) { + pathFileIO = new RefreshCredentialFileIO( resourcePaths, catalogAuth, options, client, identifier); - tableFullName2FileIO.put(identifier.getFullName(), tableFileIO); - return tableFileIO; + path2FileIO.put(path, pathFileIO); } + return pathFileIO; } return fileIO; } @@ -328,7 +320,6 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep public Table getTable(Identifier identifier) throws TableNotExistException { return CatalogUtils.loadTable( this, - this.fileIO(identifier), identifier, this::loadTableMetadata, new RESTSnapshotCommitFactory(catalogLoader())); From 3eb1af8df246aed55123c64670dae811b32cdc81 Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 20 Jan 2025 11:43:53 +0800 Subject: [PATCH 4/7] add test for refresh file IO in RESTCatalog --- .../apache/paimon/catalog/CatalogUtils.java | 7 +++--- .../paimon/rest/RefreshCredentialFileIO.java | 3 ++- .../apache/paimon/rest/RESTCatalogServer.java | 17 ++++++++++++++ .../apache/paimon/rest/RESTCatalogTest.java | 23 ++++++++++++++++++- 4 files changed, 45 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index 00db34284102..d6106e94f88f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -194,7 +194,7 @@ public static Table loadTable( catalog.fileIO(identifier, path), path, schema, catalogEnv); if (options.type() == TableType.OBJECT_TABLE) { - table = toObjectTable(catalog, table); + table = toObjectTable(catalog, identifier, table); } if (identifier.isSystemTable()) { @@ -266,10 +266,11 @@ private static FormatTable toFormatTable(Identifier identifier, TableSchema sche .build(); } - private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable underlyingTable) { + private static ObjectTable toObjectTable( + Catalog catalog, Identifier identifier, FileStoreTable underlyingTable) { CoreOptions options = underlyingTable.coreOptions(); String objectLocation = options.objectLocation(); - FileIO objectFileIO = catalog.fileIO(new Path(objectLocation)); + FileIO objectFileIO = catalog.fileIO(identifier, new Path(objectLocation)); return ObjectTable.builder() .underlyingTable(underlyingTable) .objectLocation(objectLocation) diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java b/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java index ca9dbacc58c0..803370fa7563 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java @@ -43,9 +43,9 @@ public class RefreshCredentialFileIO implements FileIO { private final AuthSession catalogAuth; protected Options options; private final Identifier identifier; - private final transient RESTClient client; private Date expireAt; private Map credential; + private final transient RESTClient client; private transient volatile FileIO lazyFileIO; public RefreshCredentialFileIO( @@ -134,6 +134,7 @@ private FileIO fileIO() throws IOException { return lazyFileIO; } + // todo: handle exception private GetTableCredentialsResponse getCredential() { return client.get( resourcePaths.tableCredentials( diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 91024867b7ea..e396bca75b28 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -42,6 +42,7 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.ErrorResponseResourceType; import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.GetTableCredentialsResponse; import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.GetViewResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; @@ -63,8 +64,10 @@ import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.util.Date; import java.util.List; import java.util.UUID; @@ -149,6 +152,10 @@ public MockResponse dispatch(RecordedRequest request) { resources.length == 3 && "tables".equals(resources[1]) && "commit".equals(resources[2]); + boolean isTableCredentials = + resources.length == 4 + && "tables".equals(resources[1]) + && "credentials".equals(resources[3]); boolean isPartitions = resources.length == 4 && "tables".equals(resources[1]) @@ -202,6 +209,16 @@ public MockResponse dispatch(RecordedRequest request) { } else if (isPartitions) { String tableName = resources[2]; return partitionsApiHandler(catalog, request, databaseName, tableName); + } else if (isTableCredentials) { + GetTableCredentialsResponse getTableCredentialsResponse = + new GetTableCredentialsResponse( + new Date(System.currentTimeMillis()), + ImmutableMap.of("key", "value")); + return new MockResponse() + .setResponseCode(200) + .setBody( + OBJECT_MAPPER.writeValueAsString( + getTableCredentialsResponse)); } else if (isTableRename) { return renameTableApiHandler(catalog, request); } else if (isTableCommit) { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index d1ce64b6c543..752f492078b7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.exceptions.NotAuthorizedException; import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -49,12 +50,12 @@ class RESTCatalogTest extends CatalogTestBase { private RESTCatalogServer restCatalogServer; + private String initToken = "init_token"; @BeforeEach @Override public void setUp() throws Exception { super.setUp(); - String initToken = "init_token"; restCatalogServer = new RESTCatalogServer(warehouse, initToken); restCatalogServer.start(); Options options = new Options(); @@ -107,6 +108,26 @@ void testListPartitionsFromFile() throws Exception { assertEquals(0, result.size()); } + @Test + void testRefreshFileIO() throws Exception { + Options options = new Options(); + options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl()); + options.set(RESTCatalogOptions.TOKEN, initToken); + options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1); + options.set(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE, true); + this.catalog = new RESTCatalog(CatalogContext.create(options)); + List identifiers = + Lists.newArrayList( + Identifier.create("test_db_a", "test_table_a"), + Identifier.create("test_db_b", "test_table_b"), + Identifier.create("test_db_c", "test_table_c")); + for (Identifier identifier : identifiers) { + createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1")); + FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(identifier); + assertEquals(true, fileStoreTable.fileIO().exists(fileStoreTable.location())); + } + } + @Override protected boolean supportsFormatTable() { return true; From a3b478f08322f39649d9bf40d17b7ff9fd70f1cc Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 20 Jan 2025 16:43:51 +0800 Subject: [PATCH 5/7] add open api for table credentials --- .../paimon/rest/RefreshCredentialFileIO.java | 7 ++- .../GetTableCredentialsResponse.java | 17 ++++--- .../apache/paimon/rest/RESTCatalogServer.java | 3 +- paimon-open-api/rest-catalog-open-api.yaml | 47 +++++++++++++++++++ .../open/api/RESTCatalogController.java | 28 +++++++++++ 5 files changed, 87 insertions(+), 15 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java b/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java index 803370fa7563..b55486887d55 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java @@ -31,7 +31,6 @@ import org.apache.paimon.rest.responses.GetTableCredentialsResponse; import java.io.IOException; -import java.util.Date; import java.util.Map; /** A {@link FileIO} to support refresh credential. */ @@ -43,7 +42,7 @@ public class RefreshCredentialFileIO implements FileIO { private final AuthSession catalogAuth; protected Options options; private final Identifier identifier; - private Date expireAt; + private Long expireAtMillis; private Map credential; private final transient RESTClient client; private transient volatile FileIO lazyFileIO; @@ -120,7 +119,7 @@ private FileIO fileIO() throws IOException { synchronized (this) { if (lazyFileIO == null || shouldRefresh()) { GetTableCredentialsResponse response = getCredential(); - expireAt = response.getExpiresAt(); + expireAtMillis = response.getExpiresAtMillis(); credential = response.getCredential(); Map conf = RESTUtil.merge(options.toMap(), credential); Options updateCredentialOption = new Options(conf); @@ -144,6 +143,6 @@ private GetTableCredentialsResponse getCredential() { } private boolean shouldRefresh() { - return expireAt != null && expireAt.getTime() > System.currentTimeMillis(); + return expireAtMillis != null && expireAtMillis > System.currentTimeMillis(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java index d40e317de757..2792940ff6b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java @@ -25,7 +25,6 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.util.Date; import java.util.Map; /** Response for table credentials. */ @@ -33,19 +32,19 @@ public class GetTableCredentialsResponse implements RESTResponse { private static final String FIELD_CREDENTIAL = "credential"; - private static final String FIELD_EXPIREAT = "expiresAt"; + private static final String FIELD_EXPIREAT_MILLIS = "expiresAtMillis"; @JsonProperty(FIELD_CREDENTIAL) private final Map credential; - @JsonProperty(FIELD_EXPIREAT) - private Date expiresAt; + @JsonProperty(FIELD_EXPIREAT_MILLIS) + private long expiresAtMillis; @JsonCreator public GetTableCredentialsResponse( - @JsonProperty(FIELD_EXPIREAT) Date expiresAt, + @JsonProperty(FIELD_EXPIREAT_MILLIS) long expiresAtMillis, @JsonProperty(FIELD_CREDENTIAL) Map credential) { - this.expiresAt = expiresAt; + this.expiresAtMillis = expiresAtMillis; this.credential = credential; } @@ -54,8 +53,8 @@ public Map getCredential() { return credential; } - @JsonGetter(FIELD_EXPIREAT) - public Date getExpiresAt() { - return expiresAt; + @JsonGetter(FIELD_EXPIREAT_MILLIS) + public long getExpiresAtMillis() { + return expiresAtMillis; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index e396bca75b28..16de35969dc6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -67,7 +67,6 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import java.io.IOException; -import java.util.Date; import java.util.List; import java.util.UUID; @@ -212,7 +211,7 @@ public MockResponse dispatch(RecordedRequest request) { } else if (isTableCredentials) { GetTableCredentialsResponse getTableCredentialsResponse = new GetTableCredentialsResponse( - new Date(System.currentTimeMillis()), + System.currentTimeMillis(), ImmutableMap.of("key", "value")); return new MockResponse() .setResponseCode(200) diff --git a/paimon-open-api/rest-catalog-open-api.yaml b/paimon-open-api/rest-catalog-open-api.yaml index 02ea7de8d0df..128514d7a59e 100644 --- a/paimon-open-api/rest-catalog-open-api.yaml +++ b/paimon-open-api/rest-catalog-open-api.yaml @@ -432,6 +432,43 @@ paths: $ref: '#/components/schemas/ErrorResponse' "500": description: Internal Server Error + /v1/{prefix}/databases/{database}/tables/{table}/credentials: + get: + tags: + - table + summary: List credentials + operationId: listCredentials + parameters: + - name: prefix + in: path + required: true + schema: + type: string + - name: database + in: path + required: true + schema: + type: string + - name: table + in: path + required: true + schema: + type: string + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/GetTableCredentialsResponse' + "404": + description: Resource not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + "500": + description: Internal Server Error /v1/{prefix}/databases/{database}/tables/{table}/partitions: get: tags: @@ -1166,6 +1203,16 @@ components: properties: success: type: boolean + GetTableCredentialsResponse: + type: object + properties: + expiresAt: + type: integer + format: int64 + credentials: + type: object + additionalProperties: + type: string AlterDatabaseRequest: type: object properties: diff --git a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java index e657407a47c3..c8eae97dec64 100644 --- a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java +++ b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java @@ -37,6 +37,7 @@ import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.GetTableCredentialsResponse; import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.GetViewResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; @@ -49,6 +50,7 @@ import org.apache.paimon.view.ViewSchema; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import io.swagger.v3.oas.annotations.Operation; @@ -355,6 +357,32 @@ public CommitTableResponse commitTable( return new CommitTableResponse(true); } + @Operation( + summary = "List credentials", + tags = {"table"}) + @ApiResponses({ + @ApiResponse( + responseCode = "200", + content = { + @Content(schema = @Schema(implementation = GetTableCredentialsResponse.class)) + }), + @ApiResponse( + responseCode = "404", + description = "Resource not found", + content = {@Content(schema = @Schema(implementation = ErrorResponse.class))}), + @ApiResponse( + responseCode = "500", + content = {@Content(schema = @Schema())}) + }) + @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}/credentials") + public GetTableCredentialsResponse listCredentials( + @PathVariable String prefix, + @PathVariable String database, + @PathVariable String table) { + return new GetTableCredentialsResponse( + System.currentTimeMillis(), ImmutableMap.of("key", "value")); + } + @Operation( summary = "List partitions", tags = {"partition"}) From 5bbe2abd17b2288370ec7bbaebcee5e7717703af Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 20 Jan 2025 16:58:43 +0800 Subject: [PATCH 6/7] remove no need code --- .../paimon/catalog/AbstractCatalog.java | 5 +-- .../org/apache/paimon/catalog/Catalog.java | 2 +- .../apache/paimon/catalog/CatalogUtils.java | 12 +++---- .../paimon/catalog/DelegateCatalog.java | 4 +-- .../org/apache/paimon/rest/RESTCatalog.java | 36 ++++++++----------- 5 files changed, 26 insertions(+), 33 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 3877e944fc78..72d09b785f1a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -92,7 +92,7 @@ public FileIO fileIO() { } @Override - public FileIO fileIO(Identifier identifier, Path path) { + public FileIO fileIO(Path path) { return fileIO; } @@ -369,7 +369,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException { SnapshotCommit.Factory commitFactory = new RenamingSnapshotCommit.Factory( lockFactory().orElse(null), lockContext().orElse(null)); - return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, commitFactory); + return CatalogUtils.loadTable( + this, identifier, fileIO(), this::loadTableMetadata, commitFactory); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 04448ef26064..a0d78b268880 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -376,7 +376,7 @@ default void repairTable(Identifier identifier) throws TableNotExistException { FileIO fileIO(); /** {@link FileIO} of this catalog. */ - FileIO fileIO(Identifier identifier, Path path); + FileIO fileIO(Path path); /** Catalog options for re-creating this catalog. */ Map options(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index d6106e94f88f..7282b308e744 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -171,6 +171,7 @@ public static List listPartitionsFromFileSystem(Table table) { public static Table loadTable( Catalog catalog, Identifier identifier, + FileIO fileIO, TableMetadata.Loader metadataLoader, SnapshotCommit.Factory commitFactory) throws Catalog.TableNotExistException { @@ -189,12 +190,10 @@ public static Table loadTable( new CatalogEnvironment( identifier, metadata.uuid(), catalog.catalogLoader(), commitFactory); Path path = new Path(schema.options().get(PATH.key())); - FileStoreTable table = - FileStoreTableFactory.create( - catalog.fileIO(identifier, path), path, schema, catalogEnv); + FileStoreTable table = FileStoreTableFactory.create(fileIO, path, schema, catalogEnv); if (options.type() == TableType.OBJECT_TABLE) { - table = toObjectTable(catalog, identifier, table); + table = toObjectTable(catalog, table); } if (identifier.isSystemTable()) { @@ -266,11 +265,10 @@ private static FormatTable toFormatTable(Identifier identifier, TableSchema sche .build(); } - private static ObjectTable toObjectTable( - Catalog catalog, Identifier identifier, FileStoreTable underlyingTable) { + private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable underlyingTable) { CoreOptions options = underlyingTable.coreOptions(); String objectLocation = options.objectLocation(); - FileIO objectFileIO = catalog.fileIO(identifier, new Path(objectLocation)); + FileIO objectFileIO = catalog.fileIO(new Path(objectLocation)); return ObjectTable.builder() .underlyingTable(underlyingTable) .objectLocation(objectLocation) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 5c12b89d1f14..847485a7a16b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -63,8 +63,8 @@ public FileIO fileIO() { } @Override - public FileIO fileIO(Identifier identifier, Path path) { - return wrapped.fileIO(identifier, path); + public FileIO fileIO(Path path) { + return wrapped.fileIO(path); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 2846e9530870..e06ef012b87a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -73,9 +73,6 @@ import org.apache.paimon.view.ViewImpl; import org.apache.paimon.view.ViewSchema; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.slf4j.Logger; @@ -114,7 +111,6 @@ public class RESTCatalog implements Catalog { private final Options options; private final boolean fileIORefreshCredentialEnable; private final FileIO fileIO; - protected Cache path2FileIO; private volatile ScheduledExecutorService refreshExecutor = null; @@ -139,12 +135,6 @@ public RESTCatalog(CatalogContext context) { options.get(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE); try { if (fileIORefreshCredentialEnable) { - path2FileIO = - Caffeine.newBuilder() - .softValues() - .executor(Runnable::run) - .ticker(Ticker.systemTicker()) - .build(); this.fileIO = null; } else { String warehouseStr = options.get(CatalogOptions.WAREHOUSE); @@ -194,18 +184,13 @@ public FileIO fileIO() { } @Override - public FileIO fileIO(Identifier identifier, Path path) { - if (fileIORefreshCredentialEnable) { - FileIO pathFileIO = path2FileIO.getIfPresent(path); - if (pathFileIO == null) { - pathFileIO = - new RefreshCredentialFileIO( - resourcePaths, catalogAuth, options, client, identifier); - path2FileIO.put(path, pathFileIO); - } - return pathFileIO; + public FileIO fileIO(Path path) { + try { + return FileIO.get(path, CatalogContext.create(options)); + } catch (IOException e) { + LOG.warn("Can not get FileIO from options."); + throw new RuntimeException(e); } - return fileIO; } @Override @@ -321,6 +306,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException { return CatalogUtils.loadTable( this, identifier, + this.fileIO(identifier), this::loadTableMetadata, new RESTSnapshotCommitFactory(catalogLoader())); } @@ -677,4 +663,12 @@ private ScheduledExecutorService tokenRefreshExecutor() { return refreshExecutor; } + + private FileIO fileIO(Identifier identifier) { + if (fileIORefreshCredentialEnable) { + return new RefreshCredentialFileIO( + resourcePaths, catalogAuth, options, client, identifier); + } + return fileIO; + } } From a5132834402f0f224b392a0708141e2c02cef425 Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 20 Jan 2025 17:24:21 +0800 Subject: [PATCH 7/7] add test for GetTableCredentialsResponse --- .../java/org/apache/paimon/rest/MockRESTMessage.java | 7 +++++++ .../org/apache/paimon/rest/RESTObjectMapperTest.java | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java index 766cb09b0bdd..a8a2d6189632 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java @@ -32,6 +32,7 @@ import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.GetTableCredentialsResponse; import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.GetViewResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; @@ -48,6 +49,7 @@ import org.apache.paimon.view.ViewSchema; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import java.util.ArrayList; @@ -248,6 +250,11 @@ public static ListViewsResponse listViewsResponse() { return new ListViewsResponse(ImmutableList.of("view")); } + public static GetTableCredentialsResponse getTableCredentialsResponse() { + return new GetTableCredentialsResponse( + System.currentTimeMillis(), ImmutableMap.of("key", "value")); + } + private static ViewSchema viewSchema() { List fields = Arrays.asList( diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java index fa56f8111828..4c3b622a8c7f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java @@ -32,6 +32,7 @@ import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.GetTableCredentialsResponse; import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.GetViewResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; @@ -273,4 +274,14 @@ public void listViewsResponseParseTest() throws Exception { ListViewsResponse parseData = OBJECT_MAPPER.readValue(responseStr, ListViewsResponse.class); assertEquals(response.getViews(), parseData.getViews()); } + + @Test + public void getTableCredentialsResponseParseTest() throws Exception { + GetTableCredentialsResponse response = MockRESTMessage.getTableCredentialsResponse(); + String responseStr = OBJECT_MAPPER.writeValueAsString(response); + GetTableCredentialsResponse parseData = + OBJECT_MAPPER.readValue(responseStr, GetTableCredentialsResponse.class); + assertEquals(response.getCredential(), parseData.getCredential()); + assertEquals(response.getExpiresAtMillis(), parseData.getExpiresAtMillis()); + } }