Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public DorisFlightSqlService(int port) {
DorisFlightSqlProducer producer = new DorisFlightSqlProducer(location, flightSessionsManager);
flightServer = FlightServer.builder(allocator, location, producer)
.headerAuthenticator(new FlightBearerTokenAuthenticator(flightTokenManager)).build();
LOG.info("Arrow Flight SQL service is created, port: {}, token_cache_size: {}"
+ ", qe_max_connection: {}, token_alive_time: {}",
port, Config.arrow_flight_token_cache_size, Config.qe_max_connection,
Config.arrow_flight_token_alive_time);
}

// start Arrow Flight SQL service, return true if success, otherwise false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.doris.service.arrowflight.tokens;

import org.apache.doris.catalog.Env;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.arrowflight.auth2.FlightAuthResult;
Expand All @@ -31,9 +32,12 @@
import com.google.common.cache.RemovalNotification;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -46,7 +50,9 @@ public class FlightTokenManagerImpl implements FlightTokenManager {
private final int cacheSize;
private final int cacheExpiration;

private LoadingCache<String, FlightTokenDetails> tokenCache;
private final LoadingCache<String, FlightTokenDetails> tokenCache;
// <username, <token, 1>>
private final ConcurrentHashMap<String, LoadingCache<String, Integer>> usersTokenLRU = new ConcurrentHashMap<>();

public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) {
this.cacheSize = cacheSize;
Expand All @@ -56,17 +62,19 @@ public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) {
.expireAfterWrite(cacheExpiration, TimeUnit.MINUTES)
.removalListener(new RemovalListener<String, FlightTokenDetails>() {
@Override
public void onRemoval(RemovalNotification<String, FlightTokenDetails> notification) {
public void onRemoval(@NotNull RemovalNotification<String, FlightTokenDetails> notification) {
// TODO: broadcast this message to other FE
LOG.info("evict bearer token: " + notification.getKey() + ", reason: "
String token = notification.getKey();
FlightTokenDetails tokenDetails = notification.getValue();
LOG.info("evict bearer token: " + token + ", reason: token number exceeded, "
+ notification.getCause());
ConnectContext context = ExecuteEnv.getInstance().getScheduler()
.getContext(notification.getKey());
.getContext(token);
if (context != null) {
ExecuteEnv.getInstance().getScheduler().unregisterConnection(context);
LOG.info("unregister flight connect context after evict bearer token: "
+ notification.getKey());
LOG.info("unregister flight connect context after evict bearer token: " + token);
}
usersTokenLRU.get(tokenDetails.getUsername()).invalidate(token);
}
}).build(new CacheLoader<String, FlightTokenDetails>() {
@Override
Expand Down Expand Up @@ -96,6 +104,29 @@ public FlightTokenDetails createToken(final String username, final FlightAuthRes
flightAuthResult.getUserIdentity(), flightAuthResult.getRemoteIp());

tokenCache.put(token, flightTokenDetails);
if (!usersTokenLRU.containsKey(username)) {
// TODO Modify usersTokenLRU size when user property maxConn changes. but LoadingCache currently not
// support modify.
usersTokenLRU.put(username,
CacheBuilder.newBuilder().maximumSize(Env.getCurrentEnv().getAuth().getMaxConn(username) / 2)
.removalListener(new RemovalListener<String, Integer>() {
@Override
public void onRemoval(@NotNull RemovalNotification<String, Integer> notification) {
// TODO: broadcast this message to other FE
assert notification.getKey() != null;
tokenCache.invalidate(notification.getKey());
LOG.info("evict bearer token: " + notification.getKey()
+ ", reason: user connection exceeded, " + notification.getCause());
}
}).build(new CacheLoader<String, Integer>() {
@NotNull
@Override
public Integer load(@NotNull String key) {
return 1;
}
}));
}
usersTokenLRU.get(username).put(token, 1);
LOG.info("Created flight token for user: {}, token: {}", username, token);
return flightTokenDetails;
}
Expand All @@ -114,6 +145,16 @@ public FlightTokenDetails validateToken(final String token) throws IllegalArgume
throw new IllegalArgumentException("bearer token expired: " + token + ", try reconnect, "
+ "currently in fe.conf, `arrow_flight_token_alive_time`=" + this.cacheExpiration);
}
if (usersTokenLRU.containsKey(value.getUsername())) {
try {
usersTokenLRU.get(value.getUsername()).get(token);
} catch (ExecutionException ignored) {
throw new IllegalArgumentException("usersTokenLRU not exist bearer token: " + token);
}
} else {
throw new IllegalArgumentException(
"bearer token not created: " + token + ", username: " + value.getUsername());
}
LOG.info("Validated bearer token for user: {}", value.getUsername());
return value;
}
Expand Down