Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,10 @@

/** Internal options for REST Catalog. */
public class RESTCatalogInternalOptions {

public static final ConfigOption<String> PREFIX =
ConfigOptions.key("prefix")
.stringType()
.noDefaultValue()
.withDescription("REST Catalog uri's prefix.");
public static final ConfigOption<String> CREDENTIALS_PROVIDER =
ConfigOptions.key("credentials-provider")
.stringType()
.noDefaultValue()
.withDescription("REST Catalog auth credentials provider.");
public static final ConfigOption<String> DATABASE_COMMENT =
ConfigOptions.key("comment")
.stringType()
.defaultValue(null)
.withDescription("REST Catalog database comment.");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.rest.auth;

import org.apache.paimon.options.Options;
import org.apache.paimon.rest.RESTCatalogOptions;
import org.apache.paimon.utils.StringUtils;

import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_EXPIRATION_TIME;
import static org.apache.paimon.rest.RESTCatalogOptions.TOKEN_PROVIDER_PATH;

/** Authentication provider. */
public interface AuthProvider {

Map<String, String> authHeader();

boolean refresh();

default boolean supportRefresh() {
return false;
}

default boolean keepRefreshed() {
return false;
}

default boolean willSoonExpire() {
return false;
}

default Optional<Long> expiresAtMillis() {
return Optional.empty();
}

default Optional<Long> expiresInMills() {
return Optional.empty();
}

static AuthProvider create(Options options) {
if (options.getOptional(RESTCatalogOptions.TOKEN_PROVIDER_PATH).isPresent()) {
if (!options.getOptional(TOKEN_PROVIDER_PATH).isPresent()) {
throw new IllegalArgumentException(TOKEN_PROVIDER_PATH.key() + " is required");
}
String tokenFilePath = options.get(TOKEN_PROVIDER_PATH);
if (options.getOptional(TOKEN_EXPIRATION_TIME).isPresent()) {
long tokenExpireInMills = options.get(TOKEN_EXPIRATION_TIME).toMillis();
return new BearTokenFileAuthProvider(tokenFilePath, tokenExpireInMills);

} else {
return new BearTokenFileAuthProvider(tokenFilePath);
}
} else {
if (options.getOptional(RESTCatalogOptions.TOKEN)
.map(StringUtils::isNullOrWhitespaceOnly)
.orElse(true)) {
throw new IllegalArgumentException(
RESTCatalogOptions.TOKEN.key() + " is required and not empty");
}
return new BearTokenAuthProvider(options.get(RESTCatalogOptions.TOKEN));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.RESTCatalog;
import org.apache.paimon.rest.RESTUtil;

import org.slf4j.Logger;
Expand All @@ -34,37 +33,38 @@
import static org.apache.paimon.rest.RESTCatalog.HEADER_PREFIX;
import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;

/** Auth session. */
/** Authentication session. */
public class AuthSession {

static final int TOKEN_REFRESH_NUM_RETRIES = 5;
static final int REFRESH_NUM_RETRIES = 5;
static final long MIN_REFRESH_WAIT_MILLIS = 10;
static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes

private static final Logger log = LoggerFactory.getLogger(AuthSession.class);
private final CredentialsProvider credentialsProvider;
private static final Logger LOG = LoggerFactory.getLogger(AuthSession.class);

private final AuthProvider authProvider;
private volatile Map<String, String> headers;

public AuthSession(Map<String, String> headers, CredentialsProvider credentialsProvider) {
this.credentialsProvider = credentialsProvider;
this.headers = RESTUtil.merge(headers, this.credentialsProvider.authHeader());
public AuthSession(Map<String, String> headers, AuthProvider authProvider) {
this.authProvider = authProvider;
this.headers = RESTUtil.merge(headers, this.authProvider.authHeader());
}

public static AuthSession fromRefreshCredentialsProvider(
public static AuthSession fromRefreshAuthProvider(
ScheduledExecutorService executor,
Map<String, String> headers,
CredentialsProvider credentialsProvider) {
AuthSession session = new AuthSession(headers, credentialsProvider);
AuthProvider authProvider) {
AuthSession session = new AuthSession(headers, authProvider);

long startTimeMillis = System.currentTimeMillis();
Optional<Long> expiresAtMillisOpt = credentialsProvider.expiresAtMillis();
Optional<Long> expiresAtMillisOpt = authProvider.expiresAtMillis();

// when init session if credentials expire time is in the past, refresh it and update
// when init session if token expire time is in the past, refresh it and update
// expiresAtMillis
if (expiresAtMillisOpt.isPresent() && expiresAtMillisOpt.get() <= startTimeMillis) {
boolean refreshSuccessful = session.refresh();
if (refreshSuccessful) {
expiresAtMillisOpt = session.credentialsProvider.expiresAtMillis();
expiresAtMillisOpt = session.authProvider.expiresAtMillis();
}
}

Expand All @@ -76,21 +76,20 @@ public static AuthSession fromRefreshCredentialsProvider(
}

public Map<String, String> getHeaders() {
if (this.credentialsProvider.keepRefreshed() && this.credentialsProvider.willSoonExpire()) {
if (this.authProvider.keepRefreshed() && this.authProvider.willSoonExpire()) {
refresh();
}
return headers;
}

public Boolean refresh() {
if (this.credentialsProvider.supportRefresh()
&& this.credentialsProvider.keepRefreshed()
&& this.credentialsProvider.expiresInMills().isPresent()) {
boolean isSuccessful = this.credentialsProvider.refresh();
if (this.authProvider.supportRefresh()
&& this.authProvider.keepRefreshed()
&& this.authProvider.expiresInMills().isPresent()) {
boolean isSuccessful = this.authProvider.refresh();
if (isSuccessful) {
Map<String, String> currentHeaders = this.headers;
this.headers =
RESTUtil.merge(currentHeaders, this.credentialsProvider.authHeader());
this.headers = RESTUtil.merge(currentHeaders, this.authProvider.authHeader());
}
return isSuccessful;
}
Expand Down Expand Up @@ -119,44 +118,45 @@ private static void scheduleTokenRefresh(
AuthSession session,
long expiresAtMillis,
int retryTimes) {
if (retryTimes < TOKEN_REFRESH_NUM_RETRIES) {
if (retryTimes < REFRESH_NUM_RETRIES) {
long expiresInMillis = expiresAtMillis - System.currentTimeMillis();
long timeToWait = getTimeToWaitByExpiresInMills(expiresInMillis);

executor.schedule(
() -> {
long refreshStartTime = System.currentTimeMillis();
boolean isSuccessful = session.refresh();
if (isSuccessful) {
scheduleTokenRefresh(
executor,
session,
refreshStartTime
+ session.credentialsProvider.expiresInMills().get(),
0);
} else {
scheduleTokenRefresh(
executor, session, expiresAtMillis, retryTimes + 1);
}
},
() -> doRefresh(executor, session, expiresAtMillis, retryTimes),
timeToWait,
TimeUnit.MILLISECONDS);
} else {
log.warn("Failed to refresh token after {} retries.", TOKEN_REFRESH_NUM_RETRIES);
LOG.warn("Failed to refresh token after {} retries.", REFRESH_NUM_RETRIES);
}
}

private static void doRefresh(
ScheduledExecutorService executor,
AuthSession session,
long expiresAtMillis,
int retryTimes) {
long refreshStartTime = System.currentTimeMillis();
boolean isSuccessful = session.refresh();
if (isSuccessful) {
scheduleTokenRefresh(
executor,
session,
refreshStartTime + session.authProvider.expiresInMills().get(),
0);
} else {
scheduleTokenRefresh(executor, session, expiresAtMillis, retryTimes + 1);
}
}

public static AuthSession createAuthSession(
Options options, ScheduledExecutorService refreshExecutor) {
Map<String, String> baseHeader = extractPrefixMap(options, HEADER_PREFIX);
CredentialsProvider credentialsProvider =
CredentialsProviderFactory.createCredentialsProvider(
options, RESTCatalog.class.getClassLoader());
if (credentialsProvider.keepRefreshed()) {
return AuthSession.fromRefreshCredentialsProvider(
refreshExecutor, baseHeader, credentialsProvider);
AuthProvider authProvider = AuthProvider.create(options);
if (authProvider.keepRefreshed()) {
return AuthSession.fromRefreshAuthProvider(refreshExecutor, baseHeader, authProvider);
} else {
return new AuthSession(baseHeader, credentialsProvider);
return new AuthSession(baseHeader, authProvider);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,29 @@

import java.util.Map;

/** Base bear token credentials provider. */
public abstract class BaseBearTokenCredentialsProvider implements CredentialsProvider {
/** Auth provider for bear token. */
public class BearTokenAuthProvider implements AuthProvider {

private static final String AUTHORIZATION_HEADER = "Authorization";
private static final String BEARER_PREFIX = "Bearer ";

protected String token;

public BearTokenAuthProvider(String token) {
this.token = token;
}

public String token() {
return token;
}

@Override
public Map<String, String> authHeader() {
return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token());
return ImmutableMap.of(AUTHORIZATION_HEADER, BEARER_PREFIX + token);
}

abstract String token();
@Override
public boolean refresh() {
return true;
}
}

This file was deleted.

This file was deleted.

Loading
Loading