From f06696cddecc8f54c663445858166094cf1fea11 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 8 Aug 2024 22:27:37 +0800 Subject: [PATCH 1/2] 1 --- .../arrowflight/DorisFlightSqlService.java | 4 ++ .../tokens/FlightTokenManagerImpl.java | 47 +++++++++++++++++-- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java index 85377788097b8f..df9099c6816080 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java @@ -57,6 +57,10 @@ public DorisFlightSqlService(int port) { DorisFlightSqlProducer producer = new DorisFlightSqlProducer(location, flightSessionsManager); flightServer = FlightServer.builder(allocator, location, producer) .headerAuthenticator(new FlightBearerTokenAuthenticator(flightTokenManager)).build(); + LOG.info("Arrow Flight SQL service is created, port: {}, token_cache_size: {}" + + ", qe_max_connection: {}, token_alive_time: {}", + port, Config.arrow_flight_token_cache_size, Config.qe_max_connection, + Config.arrow_flight_token_alive_time); } // start Arrow Flight SQL service, return true if success, otherwise false diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java index cd1b492de068cd..5803e81234a3c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java @@ -19,6 +19,7 @@ package org.apache.doris.service.arrowflight.tokens; +import org.apache.doris.catalog.Env; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.arrowflight.auth2.FlightAuthResult; @@ -31,9 +32,11 @@ import com.google.common.cache.RemovalNotification; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import java.math.BigInteger; import java.security.SecureRandom; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** @@ -47,6 +50,8 @@ public class FlightTokenManagerImpl implements FlightTokenManager { private final int cacheExpiration; private LoadingCache tokenCache; + // > + private ConcurrentHashMap> usersTokenLRU = new ConcurrentHashMap<>(); public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) { this.cacheSize = cacheSize; @@ -56,17 +61,19 @@ public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) { .expireAfterWrite(cacheExpiration, TimeUnit.MINUTES) .removalListener(new RemovalListener() { @Override - public void onRemoval(RemovalNotification notification) { + public void onRemoval(@NotNull RemovalNotification notification) { // TODO: broadcast this message to other FE - LOG.info("evict bearer token: " + notification.getKey() + ", reason: " + String token = notification.getKey(); + FlightTokenDetails tokenDetails = notification.getValue(); + LOG.info("evict bearer token: " + token + ", reason: token number exceeded, " + notification.getCause()); ConnectContext context = ExecuteEnv.getInstance().getScheduler() - .getContext(notification.getKey()); + .getContext(token); if (context != null) { ExecuteEnv.getInstance().getScheduler().unregisterConnection(context); - LOG.info("unregister flight connect context after evict bearer token: " - + notification.getKey()); + LOG.info("unregister flight connect context after evict bearer token: " + token); } + usersTokenLRU.get(tokenDetails.getUsername()).invalidate(token); } }).build(new CacheLoader() { @Override @@ -96,6 +103,30 @@ public FlightTokenDetails createToken(final String username, final FlightAuthRes flightAuthResult.getUserIdentity(), flightAuthResult.getRemoteIp()); tokenCache.put(token, flightTokenDetails); + if (usersTokenLRU.containsKey(username)) { + usersTokenLRU.get(username).put(token, 1); + } else { + // TODO Modify usersTokenLRU size when user property maxConn changes. but LoadingCache currently not + // support modify. + usersTokenLRU.put(username, + CacheBuilder.newBuilder().maximumSize(Env.getCurrentEnv().getAuth().getMaxConn(username) / 2) + .removalListener(new RemovalListener() { + @Override + public void onRemoval(@NotNull RemovalNotification notification) { + // TODO: broadcast this message to other FE + assert notification.getKey() != null; + tokenCache.invalidate(notification.getKey()); + LOG.info("evict bearer token: " + notification.getKey() + + ", reason: user connection exceeded, " + notification.getCause()); + } + }).build(new CacheLoader() { + @NotNull + @Override + public Integer load(@NotNull String key) { + return 1; + } + })); + } LOG.info("Created flight token for user: {}, token: {}", username, token); return flightTokenDetails; } @@ -114,6 +145,12 @@ public FlightTokenDetails validateToken(final String token) throws IllegalArgume throw new IllegalArgumentException("bearer token expired: " + token + ", try reconnect, " + "currently in fe.conf, `arrow_flight_token_alive_time`=" + this.cacheExpiration); } + if (usersTokenLRU.containsKey(value.getUsername())) { + usersTokenLRU.get(value.getUsername()).refresh(token); + } else { + throw new IllegalArgumentException( + "bearer token not created: " + token + ", username: " + value.getUsername()); + } LOG.info("Validated bearer token for user: {}", value.getUsername()); return value; } From a0ba97e67a652ca8a78ea8c7b3efb0bcb5ebe490 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 21 Aug 2024 22:50:11 +0800 Subject: [PATCH 2/2] 2 --- .../tokens/FlightTokenManagerImpl.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java index 5803e81234a3c2..57101d995e0c70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java @@ -37,6 +37,7 @@ import java.math.BigInteger; import java.security.SecureRandom; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -49,9 +50,9 @@ public class FlightTokenManagerImpl implements FlightTokenManager { private final int cacheSize; private final int cacheExpiration; - private LoadingCache tokenCache; + private final LoadingCache tokenCache; // > - private ConcurrentHashMap> usersTokenLRU = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> usersTokenLRU = new ConcurrentHashMap<>(); public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) { this.cacheSize = cacheSize; @@ -103,9 +104,7 @@ public FlightTokenDetails createToken(final String username, final FlightAuthRes flightAuthResult.getUserIdentity(), flightAuthResult.getRemoteIp()); tokenCache.put(token, flightTokenDetails); - if (usersTokenLRU.containsKey(username)) { - usersTokenLRU.get(username).put(token, 1); - } else { + if (!usersTokenLRU.containsKey(username)) { // TODO Modify usersTokenLRU size when user property maxConn changes. but LoadingCache currently not // support modify. usersTokenLRU.put(username, @@ -127,6 +126,7 @@ public Integer load(@NotNull String key) { } })); } + usersTokenLRU.get(username).put(token, 1); LOG.info("Created flight token for user: {}, token: {}", username, token); return flightTokenDetails; } @@ -146,7 +146,11 @@ public FlightTokenDetails validateToken(final String token) throws IllegalArgume + "currently in fe.conf, `arrow_flight_token_alive_time`=" + this.cacheExpiration); } if (usersTokenLRU.containsKey(value.getUsername())) { - usersTokenLRU.get(value.getUsername()).refresh(token); + try { + usersTokenLRU.get(value.getUsername()).get(token); + } catch (ExecutionException ignored) { + throw new IllegalArgumentException("usersTokenLRU not exist bearer token: " + token); + } } else { throw new IllegalArgumentException( "bearer token not created: " + token + ", username: " + value.getUsername());