diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java index f8959def67d1..c64b9e26ea6e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java @@ -20,6 +20,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; +import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; import javax.annotation.Nullable; @@ -36,6 +37,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -76,6 +79,13 @@ public static ThreadPoolExecutor createCachedThreadPool( return executor; } + public static ScheduledExecutorService createScheduledThreadPool( + int threadNum, String namePrefix) { + return new ScheduledThreadPoolExecutor( + threadNum, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix).build()); + } + /** This method aims to parallel process tasks with memory control and sequentially. */ public static Iterable sequentialBatchedExecute( ThreadPoolExecutor executor, diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index c96400831370..e18946b3374b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -18,7 +18,6 @@ package org.apache.paimon.rest; -import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; @@ -27,37 +26,42 @@ import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.rest.auth.AuthSession; +import org.apache.paimon.rest.auth.CredentialsProvider; +import org.apache.paimon.rest.auth.CredentialsProviderFactory; import org.apache.paimon.rest.responses.ConfigResponse; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; -import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; +import org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; /** A catalog implementation for REST. */ public class RESTCatalog implements Catalog { private RESTClient client; - private String token; private ResourcePaths resourcePaths; private Map options; private Map baseHeader; + // a lazy thread pool for token refresh + private final AuthSession catalogAuth; + private volatile ScheduledExecutorService refreshExecutor = null; private static final ObjectMapper objectMapper = RESTObjectMapper.create(); - static final String AUTH_HEADER = "Authorization"; - static final String AUTH_HEADER_VALUE_FORMAT = "Bearer %s"; public RESTCatalog(Options options) { if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) { throw new IllegalArgumentException("Can not config warehouse in RESTCatalog."); } String uri = options.get(RESTCatalogOptions.URI); - token = options.get(RESTCatalogOptions.TOKEN); Optional connectTimeout = options.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT); Optional readTimeout = options.getOptional(RESTCatalogOptions.READ_TIMEOUT); @@ -71,12 +75,21 @@ public RESTCatalog(Options options) { threadPoolSize, DefaultErrorHandler.getInstance()); this.client = new HttpClient(httpClientOptions); - Map authHeaders = - ImmutableMap.of(AUTH_HEADER, String.format(AUTH_HEADER_VALUE_FORMAT, token)); + this.baseHeader = configHeaders(options.toMap()); + CredentialsProvider credentialsProvider = + CredentialsProviderFactory.createCredentialsProvider( + options, RESTCatalog.class.getClassLoader()); + if (credentialsProvider.keepRefreshed()) { + this.catalogAuth = + AuthSession.fromRefreshCredentialsProvider( + tokenRefreshExecutor(), this.baseHeader, credentialsProvider); + + } else { + this.catalogAuth = new AuthSession(this.baseHeader, credentialsProvider); + } Map initHeaders = - RESTUtil.merge(configHeaders(options.toMap()), authHeaders); + RESTUtil.merge(configHeaders(options.toMap()), this.catalogAuth.getHeaders()); this.options = fetchOptionsFromServer(initHeaders, options.toMap()); - this.baseHeader = configHeaders(this.options()); this.resourcePaths = ResourcePaths.forCatalogProperties( this.options.get(RESTCatalogInternalOptions.PREFIX)); @@ -187,11 +200,27 @@ public void close() throws Exception {} Map fetchOptionsFromServer( Map headers, Map clientProperties) { ConfigResponse response = - client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers); + client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers()); return response.merge(clientProperties); } private static Map configHeaders(Map properties) { return RESTUtil.extractPrefixMap(properties, "header."); } + + private Map headers() { + return catalogAuth.getHeaders(); + } + + private ScheduledExecutorService tokenRefreshExecutor() { + if (refreshExecutor == null) { + synchronized (this) { + if (refreshExecutor == null) { + this.refreshExecutor = createScheduledThreadPool(1, "token-refresh-thread"); + } + } + } + + return refreshExecutor; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java index cf61caa20e88..62a8bf134ae5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogInternalOptions.java @@ -28,4 +28,9 @@ public class RESTCatalogInternalOptions { .stringType() .noDefaultValue() .withDescription("REST Catalog uri's prefix."); + public static final ConfigOption CREDENTIALS_PROVIDER = + ConfigOptions.key("credentials-provider") + .stringType() + .noDefaultValue() + .withDescription("REST Catalog auth credentials provider."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 6155b893751b..8f7bea91dcd3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -30,11 +30,6 @@ public class RESTCatalogOptions { .stringType() .noDefaultValue() .withDescription("REST Catalog server's uri."); - public static final ConfigOption TOKEN = - ConfigOptions.key("token") - .stringType() - .noDefaultValue() - .withDescription("REST Catalog server's auth token."); public static final ConfigOption CONNECTION_TIMEOUT = ConfigOptions.key("rest.client.connection-timeout") .durationType() @@ -50,4 +45,23 @@ public class RESTCatalogOptions { .intType() .defaultValue(1) .withDescription("REST Catalog http client thread num."); + public static final ConfigOption TOKEN = + ConfigOptions.key("token") + .stringType() + .noDefaultValue() + .withDescription("REST Catalog auth token."); + public static final ConfigOption TOKEN_EXPIRATION_TIME = + ConfigOptions.key("token.expiration-time") + .durationType() + .defaultValue(Duration.ofHours(1)) + .withDescription( + "REST Catalog auth token expires time.The token generates system refresh frequency is t1," + + " the token expires time is t2, we need to guarantee that t2 > t1," + + " the token validity time is [t2 - t1, t2]," + + " and the expires time defined here needs to be less than (t2 - t1)"); + public static final ConfigOption TOKEN_PROVIDER_PATH = + ConfigOptions.key("token.provider.path") + .stringType() + .noDefaultValue() + .withDescription("REST Catalog auth token provider path."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java new file mode 100644 index 000000000000..74efb8508a06 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/AuthSession.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.rest.RESTUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** Auth session. */ +public class AuthSession { + + static final int TOKEN_REFRESH_NUM_RETRIES = 5; + private static final Logger log = LoggerFactory.getLogger(AuthSession.class); + private static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes + private static final long MIN_REFRESH_WAIT_MILLIS = 10; + private final CredentialsProvider credentialsProvider; + private volatile Map headers; + + public AuthSession(Map headers, CredentialsProvider credentialsProvider) { + this.headers = headers; + this.credentialsProvider = credentialsProvider; + } + + public static AuthSession fromRefreshCredentialsProvider( + ScheduledExecutorService executor, + Map headers, + CredentialsProvider credentialsProvider) { + AuthSession session = new AuthSession(headers, credentialsProvider); + + long startTimeMillis = System.currentTimeMillis(); + Optional expiresAtMillisOpt = credentialsProvider.expiresAtMillis(); + + // when init session if credentials expire time is in the past, refresh it and update + // expiresAtMillis + if (expiresAtMillisOpt.isPresent() && expiresAtMillisOpt.get() <= startTimeMillis) { + boolean refreshSuccessful = session.refresh(); + if (refreshSuccessful) { + expiresAtMillisOpt = session.credentialsProvider.expiresAtMillis(); + } + } + + if (null != executor && expiresAtMillisOpt.isPresent()) { + scheduleTokenRefresh(executor, session, expiresAtMillisOpt.get()); + } + + return session; + } + + public Map getHeaders() { + if (this.credentialsProvider.keepRefreshed() && this.credentialsProvider.willSoonExpire()) { + refresh(); + } + return headers; + } + + @VisibleForTesting + static void scheduleTokenRefresh( + ScheduledExecutorService executor, AuthSession session, long expiresAtMillis) { + scheduleTokenRefresh(executor, session, expiresAtMillis, 0); + } + + private static void scheduleTokenRefresh( + ScheduledExecutorService executor, + AuthSession session, + long expiresAtMillis, + int retryTimes) { + if (retryTimes < TOKEN_REFRESH_NUM_RETRIES) { + long expiresInMillis = expiresAtMillis - System.currentTimeMillis(); + // how much ahead of time to start the refresh to allow it to complete + long refreshWindowMillis = Math.min(expiresInMillis, MAX_REFRESH_WINDOW_MILLIS); + // how much time to wait before expiration + long waitIntervalMillis = expiresInMillis - refreshWindowMillis; + // how much time to actually wait + long timeToWait = Math.max(waitIntervalMillis, MIN_REFRESH_WAIT_MILLIS); + + executor.schedule( + () -> { + long refreshStartTime = System.currentTimeMillis(); + boolean isSuccessful = session.refresh(); + if (isSuccessful) { + scheduleTokenRefresh( + executor, + session, + refreshStartTime + + session.credentialsProvider.expiresInMills().get(), + 0); + } else { + scheduleTokenRefresh( + executor, session, expiresAtMillis, retryTimes + 1); + } + }, + timeToWait, + TimeUnit.MILLISECONDS); + } else { + log.warn("Failed to refresh token after {} retries.", TOKEN_REFRESH_NUM_RETRIES); + } + } + + public Boolean refresh() { + if (this.credentialsProvider.supportRefresh() + && this.credentialsProvider.keepRefreshed() + && this.credentialsProvider.expiresInMills().isPresent()) { + boolean isSuccessful = this.credentialsProvider.refresh(); + if (isSuccessful) { + Map currentHeaders = this.headers; + this.headers = + RESTUtil.merge(currentHeaders, this.credentialsProvider.authHeader()); + } + return isSuccessful; + } + + return false; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BaseBearTokenCredentialsProvider.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BaseBearTokenCredentialsProvider.java new file mode 100644 index 000000000000..d3df87826164 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BaseBearTokenCredentialsProvider.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; + +import java.util.Map; + +/** Base bear token credentials provider. */ +public abstract class BaseBearTokenCredentialsProvider implements CredentialsProvider { + + private static final String AUTHORIZATION_HEADER = "Authorization"; + private static final String BEARER_PREFIX = "Bearer "; + + @Override + public Map authHeader() { + return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token()); + } + + abstract String token(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProvider.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProvider.java new file mode 100644 index 000000000000..89228fe10b28 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +/** credentials provider for bear token. */ +public class BearTokenCredentialsProvider extends BaseBearTokenCredentialsProvider { + + private final String token; + + public BearTokenCredentialsProvider(String token) { + this.token = token; + } + + @Override + String token() { + return this.token; + } + + @Override + public boolean refresh() { + return true; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProviderFactory.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProviderFactory.java new file mode 100644 index 000000000000..e63ac5606b01 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenCredentialsProviderFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +import org.apache.paimon.options.Options; +import org.apache.paimon.rest.RESTCatalogOptions; +import org.apache.paimon.utils.StringUtils; + +/** factory for create {@link BearTokenCredentialsProvider}. */ +public class BearTokenCredentialsProviderFactory implements CredentialsProviderFactory { + + @Override + public String identifier() { + return CredentialsProviderType.BEAR_TOKEN.name(); + } + + @Override + public CredentialsProvider create(Options options) { + if (options.getOptional(RESTCatalogOptions.TOKEN) + .map(StringUtils::isNullOrWhitespaceOnly) + .orElse(true)) { + throw new IllegalArgumentException( + RESTCatalogOptions.TOKEN.key() + " is required and not empty"); + } + return new BearTokenCredentialsProvider(options.get(RESTCatalogOptions.TOKEN)); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProvider.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProvider.java new file mode 100644 index 000000000000..d479caa67fd0 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProvider.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +import org.apache.paimon.utils.FileIOUtils; +import org.apache.paimon.utils.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Optional; + +/** credentials provider for get bear token from file. */ +public class BearTokenFileCredentialsProvider extends BaseBearTokenCredentialsProvider { + + public static final double EXPIRED_FACTOR = 0.4; + + private final String tokenFilePath; + private String token; + private boolean keepRefreshed = false; + private Long expiresAtMillis = null; + private Long expiresInMills = null; + + public BearTokenFileCredentialsProvider(String tokenFilePath) { + this.tokenFilePath = tokenFilePath; + this.token = getTokenFromFile(); + } + + public BearTokenFileCredentialsProvider(String tokenFilePath, Long expiresInMills) { + this(tokenFilePath); + this.keepRefreshed = true; + this.expiresAtMillis = -1L; + this.expiresInMills = expiresInMills; + } + + @Override + String token() { + return this.token; + } + + @Override + public boolean refresh() { + long start = System.currentTimeMillis(); + String newToken = getTokenFromFile(); + if (StringUtils.isNullOrWhitespaceOnly(newToken)) { + return false; + } + this.expiresAtMillis = start + this.expiresInMills; + this.token = newToken; + return true; + } + + @Override + public boolean supportRefresh() { + return true; + } + + @Override + public boolean keepRefreshed() { + return this.keepRefreshed; + } + + @Override + public boolean willSoonExpire() { + if (keepRefreshed()) { + return expiresAtMillis().get() - System.currentTimeMillis() + < expiresInMills().get() * EXPIRED_FACTOR; + } else { + return false; + } + } + + @Override + public Optional expiresAtMillis() { + return Optional.ofNullable(this.expiresAtMillis); + } + + @Override + public Optional expiresInMills() { + return Optional.ofNullable(this.expiresInMills); + } + + private String getTokenFromFile() { + try { + return FileIOUtils.readFileUtf8(new File(tokenFilePath)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProviderFactory.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProviderFactory.java new file mode 100644 index 000000000000..a0fa6b405d62 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/BearTokenFileCredentialsProviderFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +import org.apache.paimon.options.Options; + +import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_EXPIRATION_TIME; +import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER_PATH; + +/** factory for create {@link BearTokenCredentialsProvider}. */ +public class BearTokenFileCredentialsProviderFactory implements CredentialsProviderFactory { + + @Override + public String identifier() { + return CredentialsProviderType.BEAR_TOKEN_FILE.name(); + } + + @Override + public CredentialsProvider create(Options options) { + if (!options.getOptional(TOKEN_PROVIDER_PATH).isPresent()) { + throw new IllegalArgumentException(TOKEN_PROVIDER_PATH.key() + " is required"); + } + String tokenFilePath = options.get(TOKEN_PROVIDER_PATH); + if (options.getOptional(TOKEN_EXPIRATION_TIME).isPresent()) { + long tokenExpireInMills = options.get(TOKEN_EXPIRATION_TIME).toMillis(); + return new BearTokenFileCredentialsProvider(tokenFilePath, tokenExpireInMills); + + } else { + return new BearTokenFileCredentialsProvider(tokenFilePath); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProvider.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProvider.java new file mode 100644 index 000000000000..7fe8008e5947 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +import java.util.Map; +import java.util.Optional; + +/** Credentials provider. */ +public interface CredentialsProvider { + + Map authHeader(); + + boolean refresh(); + + default boolean supportRefresh() { + return false; + } + + default boolean keepRefreshed() { + return false; + } + + default boolean willSoonExpire() { + return false; + } + + default Optional expiresAtMillis() { + return Optional.empty(); + } + + default Optional expiresInMills() { + return Optional.empty(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderFactory.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderFactory.java new file mode 100644 index 000000000000..50c3564ad8c6 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +import org.apache.paimon.factories.Factory; +import org.apache.paimon.factories.FactoryUtil; +import org.apache.paimon.options.Options; +import org.apache.paimon.rest.RESTCatalogOptions; + +import static org.apache.paimon.rest.RESTCatalogInternalOptions.CREDENTIALS_PROVIDER; + +/** Factory for creating {@link CredentialsProvider}. */ +public interface CredentialsProviderFactory extends Factory { + + default CredentialsProvider create(Options options) { + throw new UnsupportedOperationException( + "Use create(context) for " + this.getClass().getSimpleName()); + } + + static CredentialsProvider createCredentialsProvider(Options options, ClassLoader classLoader) { + String credentialsProviderIdentifier = getCredentialsProviderTypeByConf(options).name(); + CredentialsProviderFactory credentialsProviderFactory = + FactoryUtil.discoverFactory( + classLoader, + CredentialsProviderFactory.class, + credentialsProviderIdentifier); + return credentialsProviderFactory.create(options); + } + + static CredentialsProviderType getCredentialsProviderTypeByConf(Options options) { + if (options.getOptional(CREDENTIALS_PROVIDER).isPresent()) { + return CredentialsProviderType.valueOf(options.get(CREDENTIALS_PROVIDER)); + } else if (options.getOptional(RESTCatalogOptions.TOKEN_PROVIDER_PATH).isPresent()) { + return CredentialsProviderType.BEAR_TOKEN_FILE; + } + return CredentialsProviderType.BEAR_TOKEN; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderType.java b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderType.java new file mode 100644 index 000000000000..28c344d70eee --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/auth/CredentialsProviderType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +/** Credentials provider type. */ +public enum CredentialsProviderType { + BEAR_TOKEN, + BEAR_TOKEN_FILE +} diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 3b98eef52c85..6416edd720f8 100644 --- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -37,3 +37,5 @@ org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap64AggFac org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldThetaSketchAggFactory org.apache.paimon.rest.RESTCatalogFactory +org.apache.paimon.rest.auth.BearTokenCredentialsProviderFactory +org.apache.paimon.rest.auth.BearTokenFileCredentialsProviderFactory diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java index 1140e399824c..17c13b932fd2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java @@ -18,7 +18,9 @@ package org.apache.paimon.rest; -import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; +import org.apache.paimon.rest.auth.BearTokenCredentialsProvider; +import org.apache.paimon.rest.auth.CredentialsProvider; + import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import okhttp3.mockwebserver.MockResponse; @@ -33,8 +35,6 @@ import java.util.Map; import java.util.Optional; -import static org.apache.paimon.rest.RESTCatalog.AUTH_HEADER; -import static org.apache.paimon.rest.RESTCatalog.AUTH_HEADER_VALUE_FORMAT; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -70,7 +70,8 @@ public void setUp() throws IOException { mockResponseData = new MockRESTData(MOCK_PATH); mockResponseDataStr = objectMapper.writeValueAsString(mockResponseData); httpClient = new HttpClient(httpClientOptions); - headers = ImmutableMap.of(AUTH_HEADER, String.format(AUTH_HEADER_VALUE_FORMAT, TOKEN)); + CredentialsProvider credentialsProvider = new BearTokenCredentialsProvider(TOKEN); + headers = credentialsProvider.authHeader(); } @After diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java new file mode 100644 index 000000000000..81b3ea57b703 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthSessionTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.ThreadPoolUtils; + +import org.apache.commons.io.FileUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.paimon.rest.auth.AuthSession.TOKEN_REFRESH_NUM_RETRIES; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** Test for {@link AuthSession}. */ +public class AuthSessionTest { + + @Rule public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testRefreshBearTokenFileCredentialsProvider() + throws IOException, InterruptedException { + String fileName = "token"; + Pair tokenFile2Token = generateTokenAndWriteToFile(fileName); + String token = tokenFile2Token.getRight(); + File tokenFile = tokenFile2Token.getLeft(); + Map initialHeaders = new HashMap<>(); + long expiresInMillis = 1000L; + CredentialsProvider credentialsProvider = + new BearTokenFileCredentialsProvider(tokenFile.getPath(), expiresInMillis); + ScheduledExecutorService executor = + ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"); + AuthSession session = + AuthSession.fromRefreshCredentialsProvider( + executor, initialHeaders, credentialsProvider); + Map header = session.getHeaders(); + assertEquals(header.get("Authorization"), "Bearer " + token); + tokenFile.delete(); + tokenFile2Token = generateTokenAndWriteToFile(fileName); + token = tokenFile2Token.getRight(); + Thread.sleep(expiresInMillis + 500L); + header = session.getHeaders(); + assertEquals(header.get("Authorization"), "Bearer " + token); + } + + @Test + public void testRefreshCredentialsProviderIsSoonExpire() + throws IOException, InterruptedException { + String fileName = "token"; + Pair tokenFile2Token = generateTokenAndWriteToFile(fileName); + String token = tokenFile2Token.getRight(); + File tokenFile = tokenFile2Token.getLeft(); + Map initialHeaders = new HashMap<>(); + long expiresInMillis = 1000L; + CredentialsProvider credentialsProvider = + new BearTokenFileCredentialsProvider(tokenFile.getPath(), expiresInMillis); + AuthSession session = + AuthSession.fromRefreshCredentialsProvider( + null, initialHeaders, credentialsProvider); + Map header = session.getHeaders(); + assertEquals(header.get("Authorization"), "Bearer " + token); + tokenFile.delete(); + tokenFile2Token = generateTokenAndWriteToFile(fileName); + token = tokenFile2Token.getRight(); + tokenFile = tokenFile2Token.getLeft(); + FileUtils.writeStringToFile(tokenFile, token); + Thread.sleep( + (long) (expiresInMillis * (1 - BearTokenFileCredentialsProvider.EXPIRED_FACTOR)) + + 10L); + header = session.getHeaders(); + assertEquals(header.get("Authorization"), "Bearer " + token); + } + + @Test + public void testRetryWhenRefreshFail() throws Exception { + Map initialHeaders = new HashMap<>(); + CredentialsProvider credentialsProvider = + Mockito.mock(BearTokenFileCredentialsProvider.class); + long expiresAtMillis = System.currentTimeMillis() - 1000L; + when(credentialsProvider.expiresAtMillis()).thenReturn(Optional.of(expiresAtMillis)); + when(credentialsProvider.expiresInMills()).thenReturn(Optional.of(50L)); + when(credentialsProvider.supportRefresh()).thenReturn(true); + when(credentialsProvider.keepRefreshed()).thenReturn(true); + when(credentialsProvider.refresh()).thenReturn(false); + AuthSession session = + AuthSession.fromRefreshCredentialsProvider( + null, initialHeaders, credentialsProvider); + AuthSession.scheduleTokenRefresh( + ThreadPoolUtils.createScheduledThreadPool(1, "refresh-token"), + session, + expiresAtMillis); + Thread.sleep(10_000L); + verify(credentialsProvider, Mockito.times(TOKEN_REFRESH_NUM_RETRIES + 1)).refresh(); + } + + private Pair generateTokenAndWriteToFile(String fileName) throws IOException { + File tokenFile = folder.newFile(fileName); + String token = UUID.randomUUID().toString(); + FileUtils.writeStringToFile(tokenFile, token); + return Pair.of(tokenFile, token); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/auth/CredentialsProviderFactoryTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/auth/CredentialsProviderFactoryTest.java new file mode 100644 index 000000000000..e62a65a79aed --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/rest/auth/CredentialsProviderFactoryTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.auth; + +import org.apache.paimon.options.Options; +import org.apache.paimon.rest.RESTCatalogOptions; + +import org.apache.commons.io.FileUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.time.Duration; +import java.util.UUID; + +import static org.apache.paimon.rest.RESTCatalogInternalOptions.CREDENTIALS_PROVIDER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +/** Test for {@link CredentialsProviderFactory}. */ +public class CredentialsProviderFactoryTest { + + @Rule public TemporaryFolder folder = new TemporaryFolder(); + + @Test + public void testCreateBearTokenCredentialsProviderSuccess() { + Options options = new Options(); + String token = UUID.randomUUID().toString(); + options.set(RESTCatalogOptions.TOKEN, token); + BearTokenCredentialsProvider credentialsProvider = + (BearTokenCredentialsProvider) + CredentialsProviderFactory.createCredentialsProvider( + options, this.getClass().getClassLoader()); + assertEquals(token, credentialsProvider.token()); + } + + @Test + public void testCreateBearTokenCredentialsProviderFail() { + Options options = new Options(); + assertThrows( + IllegalArgumentException.class, + () -> + CredentialsProviderFactory.createCredentialsProvider( + options, this.getClass().getClassLoader())); + } + + @Test + public void testCreateBearTokenFileCredentialsProviderSuccess() throws Exception { + Options options = new Options(); + String fileName = "token"; + File tokenFile = folder.newFile(fileName); + String token = UUID.randomUUID().toString(); + FileUtils.writeStringToFile(tokenFile, token); + options.set(RESTCatalogOptions.TOKEN_PROVIDER_PATH, tokenFile.getPath()); + BearTokenFileCredentialsProvider credentialsProvider = + (BearTokenFileCredentialsProvider) + CredentialsProviderFactory.createCredentialsProvider( + options, this.getClass().getClassLoader()); + assertEquals(token, credentialsProvider.token()); + } + + @Test + public void testCreateBearTokenFileCredentialsProviderFail() throws Exception { + Options options = new Options(); + options.set(CREDENTIALS_PROVIDER, CredentialsProviderType.BEAR_TOKEN_FILE.name()); + assertThrows( + IllegalArgumentException.class, + () -> + CredentialsProviderFactory.createCredentialsProvider( + options, this.getClass().getClassLoader())); + } + + @Test + public void testCreateRefreshBearTokenFileCredentialsProviderSuccess() throws Exception { + Options options = new Options(); + String fileName = "token"; + File tokenFile = folder.newFile(fileName); + String token = UUID.randomUUID().toString(); + FileUtils.writeStringToFile(tokenFile, token); + options.set(RESTCatalogOptions.TOKEN_PROVIDER_PATH, tokenFile.getPath()); + options.set(RESTCatalogOptions.TOKEN_EXPIRATION_TIME, Duration.ofSeconds(10L)); + BearTokenFileCredentialsProvider credentialsProvider = + (BearTokenFileCredentialsProvider) + CredentialsProviderFactory.createCredentialsProvider( + options, this.getClass().getClassLoader()); + assertEquals(token, credentialsProvider.token()); + } + + @Test + public void getCredentialsProviderTypeByConfWhenDefineTokenPath() { + Options options = new Options(); + options.set(RESTCatalogOptions.TOKEN_PROVIDER_PATH, "/a/b/c"); + assertEquals( + CredentialsProviderType.BEAR_TOKEN_FILE, + CredentialsProviderFactory.getCredentialsProviderTypeByConf(options)); + } + + @Test + public void getCredentialsProviderTypeByConfWhenConfNotDefined() { + Options options = new Options(); + assertEquals( + CredentialsProviderType.BEAR_TOKEN, + CredentialsProviderFactory.getCredentialsProviderTypeByConf(options)); + } + + @Test + public void getCredentialsProviderTypeByConfWhenDefineProviderType() { + Options options = new Options(); + options.set(CREDENTIALS_PROVIDER, CredentialsProviderType.BEAR_TOKEN_FILE.name()); + assertEquals( + CredentialsProviderType.BEAR_TOKEN_FILE, + CredentialsProviderFactory.getCredentialsProviderTypeByConf(options)); + } +}