diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 02f846114e4c..40678fe47875 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -30,6 +30,7 @@ import javax.annotation.concurrent.ThreadSafe; import java.io.BufferedReader; +import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; @@ -62,7 +63,7 @@ */ @Public @ThreadSafe -public interface FileIO extends Serializable { +public interface FileIO extends Serializable, Closeable { Logger LOG = LoggerFactory.getLogger(FileIO.class); @@ -230,6 +231,13 @@ default FileStatus[] listDirectories(Path path) throws IOException { */ boolean rename(Path src, Path dst) throws IOException; + /** + * Override this method to empty, many FileIO implementation classes rely on static variables + * and do not have the ability to close them. + */ + @Override + default void close() {} + // ------------------------------------------------------------------------- // utils // ------------------------------------------------------------------------- diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 72d09b785f1a..7a72da38e76f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -91,8 +91,7 @@ public FileIO fileIO() { return fileIO; } - @Override - public FileIO fileIO(Path path) { + protected FileIO fileIO(Path path) { return fileIO; } @@ -370,7 +369,12 @@ public Table getTable(Identifier identifier) throws TableNotExistException { new RenamingSnapshotCommit.Factory( lockFactory().orElse(null), lockContext().orElse(null)); return CatalogUtils.loadTable( - this, identifier, fileIO(), this::loadTableMetadata, commitFactory); + this, + identifier, + p -> fileIO(), + this::fileIO, + this::loadTableMetadata, + commitFactory); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index a0d78b268880..d0ad86c224d7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -20,7 +20,6 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -375,9 +374,6 @@ default void repairTable(Identifier identifier) throws TableNotExistException { /** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */ FileIO fileIO(); - /** {@link FileIO} of this catalog. */ - FileIO fileIO(Path path); - /** Catalog options for re-creating this catalog. */ Map options(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index 7282b308e744..4d1e58c1f316 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME; @@ -171,7 +172,8 @@ public static List listPartitionsFromFileSystem(Table table) { public static Table loadTable( Catalog catalog, Identifier identifier, - FileIO fileIO, + Function dataFileIO, + Function objectFileIO, TableMetadata.Loader metadataLoader, SnapshotCommit.Factory commitFactory) throws Catalog.TableNotExistException { @@ -190,10 +192,11 @@ public static Table loadTable( new CatalogEnvironment( identifier, metadata.uuid(), catalog.catalogLoader(), commitFactory); Path path = new Path(schema.options().get(PATH.key())); - FileStoreTable table = FileStoreTableFactory.create(fileIO, path, schema, catalogEnv); + FileStoreTable table = + FileStoreTableFactory.create(dataFileIO.apply(path), path, schema, catalogEnv); if (options.type() == TableType.OBJECT_TABLE) { - table = toObjectTable(catalog, table); + table = toObjectTable(objectFileIO, table); } if (identifier.isSystemTable()) { @@ -265,10 +268,11 @@ private static FormatTable toFormatTable(Identifier identifier, TableSchema sche .build(); } - private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable underlyingTable) { + private static ObjectTable toObjectTable( + Function fileIOLoader, FileStoreTable underlyingTable) { CoreOptions options = underlyingTable.coreOptions(); String objectLocation = options.objectLocation(); - FileIO objectFileIO = catalog.fileIO(new Path(objectLocation)); + FileIO objectFileIO = fileIOLoader.apply(new Path(objectLocation)); return ObjectTable.builder() .underlyingTable(underlyingTable) .objectLocation(objectLocation) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 847485a7a16b..aa7852456e5a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -19,7 +19,6 @@ package org.apache.paimon.catalog; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -62,11 +61,6 @@ public FileIO fileIO() { return wrapped.fileIO(); } - @Override - public FileIO fileIO(Path path) { - return wrapped.fileIO(path); - } - @Override public List listDatabases() { return wrapped.listDatabases(); diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index e06ef012b87a..9c8100662b9d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -29,7 +29,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.FileStoreCommit; -import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.rest.auth.AuthSession; @@ -56,6 +55,7 @@ import org.apache.paimon.rest.responses.ErrorResponseResourceType; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.GetTableResponse; +import org.apache.paimon.rest.responses.GetTableTokenResponse; import org.apache.paimon.rest.responses.GetViewResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; import org.apache.paimon.rest.responses.ListPartitionsResponse; @@ -75,10 +75,8 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -95,6 +93,7 @@ import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; import static org.apache.paimon.rest.RESTUtil.extractPrefixMap; import static org.apache.paimon.rest.auth.AuthSession.createAuthSession; import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; @@ -102,97 +101,73 @@ /** A catalog implementation for REST. */ public class RESTCatalog implements Catalog { - private static final Logger LOG = LoggerFactory.getLogger(RESTCatalog.class); public static final String HEADER_PREFIX = "header."; private final RESTClient client; private final ResourcePaths resourcePaths; private final AuthSession catalogAuth; - private final Options options; - private final boolean fileIORefreshCredentialEnable; + private final CatalogContext context; + private final boolean dataTokenEnabled; private final FileIO fileIO; private volatile ScheduledExecutorService refreshExecutor = null; public RESTCatalog(CatalogContext context) { - if (context.options().getOptional(CatalogOptions.WAREHOUSE).isPresent()) { - throw new IllegalArgumentException("Can not config warehouse in RESTCatalog."); - } + this(context, true); + } + + public RESTCatalog(CatalogContext context, boolean configRequired) { this.client = new HttpClient(context.options()); this.catalogAuth = createAuthSession(context.options(), tokenRefreshExecutor()); - Map initHeaders = - RESTUtil.merge( - extractPrefixMap(context.options(), HEADER_PREFIX), - catalogAuth.getHeaders()); - this.options = - new Options( - client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, initHeaders) - .merge(context.options().toMap())); - this.resourcePaths = ResourcePaths.forCatalogProperties(options); - - this.fileIORefreshCredentialEnable = - options.get(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE); - try { - if (fileIORefreshCredentialEnable) { - this.fileIO = null; - } else { - String warehouseStr = options.get(CatalogOptions.WAREHOUSE); - this.fileIO = - FileIO.get( - new Path(warehouseStr), - CatalogContext.create( - options, context.preferIO(), context.fallbackIO())); + Options options = context.options(); + if (configRequired) { + if (context.options().contains(WAREHOUSE)) { + throw new IllegalArgumentException("Can not config warehouse in RESTCatalog."); } - } catch (IOException e) { - LOG.warn("Can not get FileIO from options."); - throw new RuntimeException(e); + + Map initHeaders = + RESTUtil.merge( + extractPrefixMap(context.options(), HEADER_PREFIX), + catalogAuth.getHeaders()); + options = + new Options( + client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, initHeaders) + .merge(context.options().toMap())); } - } - protected RESTCatalog(Options options, FileIO fileIO) { - this.client = new HttpClient(options); - this.catalogAuth = createAuthSession(options, tokenRefreshExecutor()); - this.options = options; + context = CatalogContext.create(options, context.preferIO(), context.fallbackIO()); + this.context = context; this.resourcePaths = ResourcePaths.forCatalogProperties(options); - this.fileIO = fileIO; - this.fileIORefreshCredentialEnable = - options.get(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE); + + this.dataTokenEnabled = options.get(RESTCatalogOptions.DATA_TOKEN_ENABLED); + this.fileIO = dataTokenEnabled ? null : fileIOFromOptions(new Path(options.get(WAREHOUSE))); } @Override public String warehouse() { - return options.get(CatalogOptions.WAREHOUSE); + return context.options().get(WAREHOUSE); } @Override public Map options() { - return options.toMap(); + return context.options().toMap(); } @Override public RESTCatalogLoader catalogLoader() { - return new RESTCatalogLoader(options, fileIO); + return new RESTCatalogLoader(context); } @Override public FileIO fileIO() { - if (fileIORefreshCredentialEnable) { + // TODO remove Catalog.fileIO + if (dataTokenEnabled) { throw new UnsupportedOperationException(); } return fileIO; } - @Override - public FileIO fileIO(Path path) { - try { - return FileIO.get(path, CatalogContext.create(options)); - } catch (IOException e) { - LOG.warn("Can not get FileIO from options."); - throw new RuntimeException(e); - } - } - @Override public List listDatabases() { ListDatabasesResponse response = @@ -306,11 +281,33 @@ public Table getTable(Identifier identifier) throws TableNotExistException { return CatalogUtils.loadTable( this, identifier, - this.fileIO(identifier), + path -> fileIOForData(path, identifier), + this::fileIOFromOptions, this::loadTableMetadata, new RESTSnapshotCommitFactory(catalogLoader())); } + private FileIO fileIOForData(Path path, Identifier identifier) { + return dataTokenEnabled + ? new RESTTokenFileIO(catalogLoader(), this, identifier, path) + : this.fileIO; + } + + private FileIO fileIOFromOptions(Path path) { + try { + return FileIO.get(path, context); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + protected GetTableTokenResponse loadTableToken(Identifier identifier) { + return client.get( + resourcePaths.tableToken(identifier.getDatabaseName(), identifier.getObjectName()), + GetTableTokenResponse.class, + catalogAuth.getHeaders()); + } + public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) { CommitTableRequest request = new CommitTableRequest(identifier, snapshot); CommitTableResponse response = @@ -630,7 +627,7 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN @Override public boolean caseSensitive() { - return options.getOptional(CASE_SENSITIVE).orElse(true); + return context.options().getOptional(CASE_SENSITIVE).orElse(true); } @Override @@ -663,12 +660,4 @@ private ScheduledExecutorService tokenRefreshExecutor() { return refreshExecutor; } - - private FileIO fileIO(Identifier identifier) { - if (fileIORefreshCredentialEnable) { - return new RefreshCredentialFileIO( - resourcePaths, catalogAuth, options, client, identifier); - } - return fileIO; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java index f90988d05cb2..efc5a0b46ca4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java @@ -18,25 +18,26 @@ package org.apache.paimon.rest; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogLoader; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.options.Options; /** Loader to create {@link RESTCatalog}. */ public class RESTCatalogLoader implements CatalogLoader { private static final long serialVersionUID = 1L; - private final Options options; - private final FileIO fileIO; + private final CatalogContext context; - public RESTCatalogLoader(Options options, FileIO fileIO) { - this.options = options; - this.fileIO = fileIO; + public RESTCatalogLoader(CatalogContext context) { + this.context = context; + } + + public CatalogContext context() { + return context; } @Override public RESTCatalog load() { - return new RESTCatalog(options, fileIO); + return new RESTCatalog(context, false); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 61aed5f7038f..35b72469bd9a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -78,9 +78,9 @@ public class RESTCatalogOptions { .noDefaultValue() .withDescription("REST Catalog auth token provider path."); - public static final ConfigOption FILE_IO_REFRESH_CREDENTIAL_ENABLE = - ConfigOptions.key("file-io-refresh-credential.enabled") + public static final ConfigOption DATA_TOKEN_ENABLED = + ConfigOptions.key("data-token.enabled") .booleanType() .defaultValue(false) - .withDescription("Whether to support file io refresh credential."); + .withDescription("Whether to support data token provided by the REST server."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java new file mode 100644 index 000000000000..220d0eaaee9b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileStatus; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.options.Options; +import org.apache.paimon.rest.responses.GetTableTokenResponse; +import org.apache.paimon.utils.ThreadUtils; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** A {@link FileIO} to support getting token from REST Server. */ +public class RESTTokenFileIO implements FileIO { + + private static final long serialVersionUID = 1L; + + private static final Cache FILE_IO_CACHE = + Caffeine.newBuilder() + .expireAfterAccess(30, TimeUnit.MINUTES) + .maximumSize(100) + .removalListener( + (ignored, value, cause) -> { + if (value != null) { + ((FileIO) value).close(); + } + }) + .scheduler( + Scheduler.forScheduledExecutorService( + Executors.newSingleThreadScheduledExecutor( + ThreadUtils.newDaemonThreadFactory( + "rest-token-file-io-scheduler")))) + .build(); + + private final RESTCatalogLoader catalogLoader; + private final Identifier identifier; + private final Path path; + + // catalog instance before serialization, it will become null after serialization, then we + // should create catalog from catalog loader + private final transient RESTCatalog catalogInstance; + + // the latest token from REST Server, serializable in order to avoid loading token from the REST + // Server again after serialization + private volatile Token token; + + public RESTTokenFileIO( + RESTCatalogLoader catalogLoader, + RESTCatalog catalogInstance, + Identifier identifier, + Path path) { + this.catalogLoader = catalogLoader; + this.catalogInstance = catalogInstance; + this.identifier = identifier; + this.path = path; + } + + @Override + public void configure(CatalogContext context) { + throw new UnsupportedOperationException("RESTTokenFileIO does not support configuration."); + } + + @Override + public SeekableInputStream newInputStream(Path path) throws IOException { + return fileIO().newInputStream(path); + } + + @Override + public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException { + return fileIO().newOutputStream(path, overwrite); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + return fileIO().getFileStatus(path); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + return fileIO().listStatus(path); + } + + @Override + public boolean exists(Path path) throws IOException { + return fileIO().exists(path); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + return fileIO().delete(path, recursive); + } + + @Override + public boolean mkdirs(Path path) throws IOException { + return fileIO().mkdirs(path); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + return fileIO().rename(src, dst); + } + + @Override + public boolean isObjectStore() { + try { + return fileIO().isObjectStore(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private FileIO fileIO() throws IOException { + if (shouldRefresh()) { + synchronized (this) { + if (shouldRefresh()) { + refreshToken(); + } + } + } + + FileIO fileIO = FILE_IO_CACHE.getIfPresent(token); + if (fileIO != null) { + return fileIO; + } + + synchronized (FILE_IO_CACHE) { + fileIO = FILE_IO_CACHE.getIfPresent(token); + if (fileIO != null) { + return fileIO; + } + + CatalogContext context = catalogLoader.context(); + Options options = context.options(); + options = new Options(RESTUtil.merge(options.toMap(), token.token)); + context = CatalogContext.create(options, context.preferIO(), context.fallbackIO()); + try { + fileIO = FileIO.get(path, context); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + FILE_IO_CACHE.put(token, fileIO); + return fileIO; + } + } + + private boolean shouldRefresh() { + return token == null || System.currentTimeMillis() > token.expireAtMillis; + } + + private void refreshToken() { + GetTableTokenResponse response; + if (catalogInstance != null) { + response = catalogInstance.loadTableToken(identifier); + } else { + try (RESTCatalog catalog = catalogLoader.load()) { + response = catalog.loadTableToken(identifier); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + token = new Token(response.getToken(), response.getExpiresAtMillis()); + } + + private static class Token implements Serializable { + + private static final long serialVersionUID = 1L; + + private final Map token; + private final long expireAtMillis; + + /** Cache the hash code. */ + @Nullable private Integer hash; + + private Token(Map token, long expireAtMillis) { + this.token = token; + this.expireAtMillis = expireAtMillis; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + Token token1 = (Token) o; + return expireAtMillis == token1.expireAtMillis && Objects.equals(token, token1.token); + } + + @Override + public int hashCode() { + if (hash == null) { + hash = Objects.hash(token, expireAtMillis); + } + return hash; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java b/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java deleted file mode 100644 index b55486887d55..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.rest; - -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.FileStatus; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.PositionOutputStream; -import org.apache.paimon.fs.SeekableInputStream; -import org.apache.paimon.options.CatalogOptions; -import org.apache.paimon.options.Options; -import org.apache.paimon.rest.auth.AuthSession; -import org.apache.paimon.rest.responses.GetTableCredentialsResponse; - -import java.io.IOException; -import java.util.Map; - -/** A {@link FileIO} to support refresh credential. */ -public class RefreshCredentialFileIO implements FileIO { - - private static final long serialVersionUID = 1L; - - private final ResourcePaths resourcePaths; - private final AuthSession catalogAuth; - protected Options options; - private final Identifier identifier; - private Long expireAtMillis; - private Map credential; - private final transient RESTClient client; - private transient volatile FileIO lazyFileIO; - - public RefreshCredentialFileIO( - ResourcePaths resourcePaths, - AuthSession catalogAuth, - Options options, - RESTClient client, - Identifier identifier) { - this.resourcePaths = resourcePaths; - this.catalogAuth = catalogAuth; - this.options = options; - this.identifier = identifier; - this.client = client; - } - - @Override - public void configure(CatalogContext context) { - this.options = context.options(); - } - - @Override - public SeekableInputStream newInputStream(Path path) throws IOException { - return fileIO().newInputStream(path); - } - - @Override - public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException { - return fileIO().newOutputStream(path, overwrite); - } - - @Override - public FileStatus getFileStatus(Path path) throws IOException { - return fileIO().getFileStatus(path); - } - - @Override - public FileStatus[] listStatus(Path path) throws IOException { - return fileIO().listStatus(path); - } - - @Override - public boolean exists(Path path) throws IOException { - return fileIO().exists(path); - } - - @Override - public boolean delete(Path path, boolean recursive) throws IOException { - return fileIO().delete(path, recursive); - } - - @Override - public boolean mkdirs(Path path) throws IOException { - return fileIO().mkdirs(path); - } - - @Override - public boolean rename(Path src, Path dst) throws IOException { - return fileIO().rename(src, dst); - } - - @Override - public boolean isObjectStore() { - try { - return fileIO().isObjectStore(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private FileIO fileIO() throws IOException { - if (lazyFileIO == null || shouldRefresh()) { - synchronized (this) { - if (lazyFileIO == null || shouldRefresh()) { - GetTableCredentialsResponse response = getCredential(); - expireAtMillis = response.getExpiresAtMillis(); - credential = response.getCredential(); - Map conf = RESTUtil.merge(options.toMap(), credential); - Options updateCredentialOption = new Options(conf); - lazyFileIO = - FileIO.get( - new Path(updateCredentialOption.get(CatalogOptions.WAREHOUSE)), - CatalogContext.create(updateCredentialOption)); - } - } - } - return lazyFileIO; - } - - // todo: handle exception - private GetTableCredentialsResponse getCredential() { - return client.get( - resourcePaths.tableCredentials( - identifier.getDatabaseName(), identifier.getObjectName()), - GetTableCredentialsResponse.class, - catalogAuth.getHeaders()); - } - - private boolean shouldRefresh() { - return expireAtMillis != null && expireAtMillis > System.currentTimeMillis(); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java index 1e843f99cb0e..69560f36e3af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -70,8 +70,8 @@ public String commitTable(String databaseName) { return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, "commit"); } - public String tableCredentials(String databaseName, String tableName) { - return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "credentials"); + public String tableToken(String databaseName, String tableName) { + return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "token"); } public String partitions(String databaseName, String tableName) { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableTokenResponse.java similarity index 65% rename from paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java rename to paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableTokenResponse.java index 2792940ff6b1..5a279a386882 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableTokenResponse.java @@ -27,33 +27,33 @@ import java.util.Map; -/** Response for table credentials. */ +/** Response for table token. */ @JsonIgnoreProperties(ignoreUnknown = true) -public class GetTableCredentialsResponse implements RESTResponse { +public class GetTableTokenResponse implements RESTResponse { - private static final String FIELD_CREDENTIAL = "credential"; - private static final String FIELD_EXPIREAT_MILLIS = "expiresAtMillis"; + private static final String FIELD_TOKEN = "token"; + private static final String FIELD_EXPIRES_AT_MILLIS = "expiresAtMillis"; - @JsonProperty(FIELD_CREDENTIAL) - private final Map credential; + @JsonProperty(FIELD_TOKEN) + private final Map token; - @JsonProperty(FIELD_EXPIREAT_MILLIS) - private long expiresAtMillis; + @JsonProperty(FIELD_EXPIRES_AT_MILLIS) + private final long expiresAtMillis; @JsonCreator - public GetTableCredentialsResponse( - @JsonProperty(FIELD_EXPIREAT_MILLIS) long expiresAtMillis, - @JsonProperty(FIELD_CREDENTIAL) Map credential) { + public GetTableTokenResponse( + @JsonProperty(FIELD_TOKEN) Map token, + @JsonProperty(FIELD_EXPIRES_AT_MILLIS) long expiresAtMillis) { + this.token = token; this.expiresAtMillis = expiresAtMillis; - this.credential = credential; } - @JsonGetter(FIELD_CREDENTIAL) - public Map getCredential() { - return credential; + @JsonGetter(FIELD_TOKEN) + public Map getToken() { + return token; } - @JsonGetter(FIELD_EXPIREAT_MILLIS) + @JsonGetter(FIELD_EXPIRES_AT_MILLIS) public long getExpiresAtMillis() { return expiresAtMillis; } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java index 576f494b8871..83fab9a6876c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java @@ -32,8 +32,8 @@ import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; -import org.apache.paimon.rest.responses.GetTableCredentialsResponse; import org.apache.paimon.rest.responses.GetTableResponse; +import org.apache.paimon.rest.responses.GetTableTokenResponse; import org.apache.paimon.rest.responses.GetViewResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; import org.apache.paimon.rest.responses.ListPartitionsResponse; @@ -250,9 +250,9 @@ public static ListViewsResponse listViewsResponse() { return new ListViewsResponse(ImmutableList.of("view")); } - public static GetTableCredentialsResponse getTableCredentialsResponse() { - return new GetTableCredentialsResponse( - System.currentTimeMillis(), ImmutableMap.of("key", "value")); + public static GetTableTokenResponse getTableCredentialsResponse() { + return new GetTableTokenResponse( + ImmutableMap.of("key", "value"), System.currentTimeMillis()); } private static ViewSchema viewSchema() { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java index 22fde48e99fe..05f3adb3c419 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java @@ -42,8 +42,8 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.ErrorResponseResourceType; import org.apache.paimon.rest.responses.GetDatabaseResponse; -import org.apache.paimon.rest.responses.GetTableCredentialsResponse; import org.apache.paimon.rest.responses.GetTableResponse; +import org.apache.paimon.rest.responses.GetTableTokenResponse; import org.apache.paimon.rest.responses.GetViewResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; import org.apache.paimon.rest.responses.ListPartitionsResponse; @@ -151,10 +151,10 @@ public MockResponse dispatch(RecordedRequest request) { resources.length == 3 && "tables".equals(resources[1]) && "commit".equals(resources[2]); - boolean isTableCredentials = + boolean isTableToken = resources.length == 4 && "tables".equals(resources[1]) - && "credentials".equals(resources[3]); + && "token".equals(resources[3]); boolean isPartitions = resources.length == 4 && "tables".equals(resources[1]) @@ -208,16 +208,16 @@ public MockResponse dispatch(RecordedRequest request) { } else if (isPartitions) { String tableName = resources[2]; return partitionsApiHandler(catalog, request, databaseName, tableName); - } else if (isTableCredentials) { - GetTableCredentialsResponse getTableCredentialsResponse = - new GetTableCredentialsResponse( - System.currentTimeMillis(), - ImmutableMap.of("key", "value")); + } else if (isTableToken) { + GetTableTokenResponse getTableTokenResponse = + new GetTableTokenResponse( + ImmutableMap.of("key", "value"), + System.currentTimeMillis()); return new MockResponse() .setResponseCode(200) .setBody( OBJECT_MAPPER.writeValueAsString( - getTableCredentialsResponse)); + getTableTokenResponse)); } else if (isTableRename) { return renameTableApiHandler(catalog, request); } else if (isTableCommit) { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 752f492078b7..1fb4dcbcd7ed 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -114,7 +114,7 @@ void testRefreshFileIO() throws Exception { options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl()); options.set(RESTCatalogOptions.TOKEN, initToken); options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1); - options.set(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE, true); + options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true); this.catalog = new RESTCatalog(CatalogContext.create(options)); List identifiers = Lists.newArrayList( diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java index 4c3b622a8c7f..4d9015ea77ea 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java @@ -32,8 +32,8 @@ import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; -import org.apache.paimon.rest.responses.GetTableCredentialsResponse; import org.apache.paimon.rest.responses.GetTableResponse; +import org.apache.paimon.rest.responses.GetTableTokenResponse; import org.apache.paimon.rest.responses.GetViewResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; import org.apache.paimon.rest.responses.ListPartitionsResponse; @@ -276,12 +276,12 @@ public void listViewsResponseParseTest() throws Exception { } @Test - public void getTableCredentialsResponseParseTest() throws Exception { - GetTableCredentialsResponse response = MockRESTMessage.getTableCredentialsResponse(); + public void getTableTokenResponseParseTest() throws Exception { + GetTableTokenResponse response = MockRESTMessage.getTableCredentialsResponse(); String responseStr = OBJECT_MAPPER.writeValueAsString(response); - GetTableCredentialsResponse parseData = - OBJECT_MAPPER.readValue(responseStr, GetTableCredentialsResponse.class); - assertEquals(response.getCredential(), parseData.getCredential()); + GetTableTokenResponse parseData = + OBJECT_MAPPER.readValue(responseStr, GetTableTokenResponse.class); + assertEquals(response.getToken(), parseData.getToken()); assertEquals(response.getExpiresAtMillis(), parseData.getExpiresAtMillis()); } } diff --git a/paimon-open-api/rest-catalog-open-api.yaml b/paimon-open-api/rest-catalog-open-api.yaml index 128514d7a59e..c5c4434402ad 100644 --- a/paimon-open-api/rest-catalog-open-api.yaml +++ b/paimon-open-api/rest-catalog-open-api.yaml @@ -432,12 +432,12 @@ paths: $ref: '#/components/schemas/ErrorResponse' "500": description: Internal Server Error - /v1/{prefix}/databases/{database}/tables/{table}/credentials: + /v1/{prefix}/databases/{database}/tables/{table}/token: get: tags: - table - summary: List credentials - operationId: listCredentials + summary: Get table token + operationId: getTableToken parameters: - name: prefix in: path @@ -460,7 +460,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/GetTableCredentialsResponse' + $ref: '#/components/schemas/GetTableDataTokenResponse' "404": description: Resource not found content: @@ -1203,16 +1203,16 @@ components: properties: success: type: boolean - GetTableCredentialsResponse: + GetTableDataTokenResponse: type: object properties: - expiresAt: - type: integer - format: int64 - credentials: + token: type: object additionalProperties: type: string + expiresAt: + type: integer + format: int64 AlterDatabaseRequest: type: object properties: diff --git a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java index c8eae97dec64..3ffec9a5c307 100644 --- a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java +++ b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java @@ -37,8 +37,8 @@ import org.apache.paimon.rest.responses.CreateDatabaseResponse; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; -import org.apache.paimon.rest.responses.GetTableCredentialsResponse; import org.apache.paimon.rest.responses.GetTableResponse; +import org.apache.paimon.rest.responses.GetTableTokenResponse; import org.apache.paimon.rest.responses.GetViewResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; import org.apache.paimon.rest.responses.ListPartitionsResponse; @@ -358,13 +358,13 @@ public CommitTableResponse commitTable( } @Operation( - summary = "List credentials", + summary = "Get table token", tags = {"table"}) @ApiResponses({ @ApiResponse( responseCode = "200", content = { - @Content(schema = @Schema(implementation = GetTableCredentialsResponse.class)) + @Content(schema = @Schema(implementation = GetTableTokenResponse.class)) }), @ApiResponse( responseCode = "404", @@ -374,13 +374,13 @@ public CommitTableResponse commitTable( responseCode = "500", content = {@Content(schema = @Schema())}) }) - @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}/credentials") - public GetTableCredentialsResponse listCredentials( + @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}/token") + public GetTableTokenResponse getTableToken( @PathVariable String prefix, @PathVariable String database, @PathVariable String table) { - return new GetTableCredentialsResponse( - System.currentTimeMillis(), ImmutableMap.of("key", "value")); + return new GetTableTokenResponse( + ImmutableMap.of("key", "value"), System.currentTimeMillis()); } @Operation(