Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,120 +23,29 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

public class ClientBuilder {
private static AtomicInteger counter = new AtomicInteger(0);
private final String name;

private final ScheduledExecutorService executor;

private final long requestTimeout;
private final TimeUnit requestTimeoutUnit;

private final long connectTimeout;
private final TimeUnit connectTimeoutUnit;

private final double backupQuantile;

private final int retries;

private final ReactiveSocketConnector<SocketAddress> connector;
private final Function<Throwable, Boolean> retryThisException;

private final Publisher<List<SocketAddress>> source;
public class ClientBuilder<T> {
private final ReactiveSocketConnector<T> connector;
private final Publisher<? extends Collection<T>> source;

private ClientBuilder(
String name,
ScheduledExecutorService executor,
long requestTimeout, TimeUnit requestTimeoutUnit,
long connectTimeout, TimeUnit connectTimeoutUnit,
double backupQuantile,
int retries, Function<Throwable, Boolean> retryThisException,
ReactiveSocketConnector<SocketAddress> connector,
Publisher<List<SocketAddress>> source
ReactiveSocketConnector<T> connector,
Publisher<? extends Collection<T>> source
) {
this.name = name;
this.executor = executor;
this.requestTimeout = requestTimeout;
this.requestTimeoutUnit = requestTimeoutUnit;
this.connectTimeout = connectTimeout;
this.connectTimeoutUnit = connectTimeoutUnit;
this.backupQuantile = backupQuantile;
this.retries = retries;
this.connector = connector;
this.retryThisException = retryThisException;
this.source = source;
}

public ClientBuilder withRequestTimeout(long timeout, TimeUnit unit) {
return new ClientBuilder(
name,
executor,
timeout, unit,
connectTimeout, connectTimeoutUnit,
backupQuantile,
retries, retryThisException,
connector,
source
);
}

public ClientBuilder withConnectTimeout(long timeout, TimeUnit unit) {
return new ClientBuilder(
name,
executor,
requestTimeout, requestTimeoutUnit,
timeout, unit,
backupQuantile,
retries, retryThisException,
public ClientBuilder<T> withConnector(ReactiveSocketConnector<T> connector) {
return new ClientBuilder<>(
connector,
source
);
}

public ClientBuilder withExecutor(ScheduledExecutorService executor) {
return new ClientBuilder(
name,
executor,
requestTimeout, requestTimeoutUnit,
connectTimeout, connectTimeoutUnit,
backupQuantile,
retries, retryThisException,
connector,
source
);
}

public ClientBuilder withConnector(ReactiveSocketConnector<SocketAddress> connector) {
return new ClientBuilder(
name,
executor,
requestTimeout, requestTimeoutUnit,
connectTimeout, connectTimeoutUnit,
backupQuantile,
retries, retryThisException,
connector,
source
);
}

public ClientBuilder withSource(Publisher<List<SocketAddress>> source) {
return new ClientBuilder(
name,
executor,
requestTimeout, requestTimeoutUnit,
connectTimeout, connectTimeoutUnit,
backupQuantile,
retries, retryThisException,
public ClientBuilder<T> withSource(Publisher<? extends Collection<T>> source) {
return new ClientBuilder<>(
connector,
source
);
Expand All @@ -150,53 +59,43 @@ public ReactiveSocket build() {
throw new IllegalStateException("Please configure the connector!");
}

ReactiveSocketConnector<SocketAddress> filterConnector = connector
.chain(socket -> new TimeoutSocket(socket, requestTimeout, requestTimeoutUnit, executor))
ReactiveSocketConnector<T> filterConnector = connector
.chain(DrainingSocket::new);

Publisher<List<ReactiveSocketFactory<SocketAddress>>> factories =
Publisher<? extends Collection<ReactiveSocketFactory<T>>> factories =
sourceToFactory(source, filterConnector);

return new LoadBalancer(factories);
return new LoadBalancer<>(factories);
}

private Publisher<List<ReactiveSocketFactory<SocketAddress>>> sourceToFactory(
Publisher<List<SocketAddress>> source,
ReactiveSocketConnector<SocketAddress> connector
private Publisher<? extends Collection<ReactiveSocketFactory<T>>> sourceToFactory(
Publisher<? extends Collection<T>> source,
ReactiveSocketConnector<T> connector
) {
return subscriber ->
source.subscribe(new Subscriber<List<SocketAddress>>() {
private Map<SocketAddress, ReactiveSocketFactory<SocketAddress>> current;
source.subscribe(new Subscriber<Collection<T>>() {
private Map<T, ReactiveSocketFactory<T>> current;

@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s);
current = new HashMap<>();
current = Collections.emptyMap();
}

@Override
public void onNext(List<SocketAddress> socketAddresses) {
socketAddresses.stream()
.filter(sa -> !current.containsKey(sa))
.map(connector::toFactory)
.map(factory -> factory.chain(TimeoutFactory.asChainFunction(connectTimeout, connectTimeoutUnit,
executor)))
.map(FailureAwareFactory::new)
.forEach(factory -> current.put(factory.remote(), factory));

Set<SocketAddress> addresses = new HashSet<>(socketAddresses);
Iterator<Map.Entry<SocketAddress, ReactiveSocketFactory<SocketAddress>>> it =
current.entrySet().iterator();
while (it.hasNext()) {
SocketAddress sa = it.next().getKey();
if (! addresses.contains(sa)) {
it.remove();
public void onNext(Collection<T> socketAddresses) {
Map<T, ReactiveSocketFactory<T>> next = new HashMap<>(socketAddresses.size());
for (T sa: socketAddresses) {
ReactiveSocketFactory<T> factory;
if ((factory = current.get(sa)) == null) {
next.put(sa, new FailureAwareFactory<>(connector.toFactory(sa)));
} else {
next.put(sa, factory);
}
}

List<ReactiveSocketFactory<SocketAddress>> factories =
current.values().stream().collect(Collectors.toList());
subscriber.onNext(factories);
current = next;
subscriber.onNext(current.values());
}

@Override
Expand All @@ -207,22 +106,8 @@ public void onNext(List<SocketAddress> socketAddresses) {
});
}

public static ClientBuilder instance() {
return new ClientBuilder(
"rs-loadbalancer-" + counter.incrementAndGet(),
Executors.newScheduledThreadPool(4, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("reactivesocket-scheduler-thread");
thread.setDaemon(true);
return thread;
}
}),
1, TimeUnit.SECONDS,
10, TimeUnit.SECONDS,
0.99,
3, t -> true,
public static <T> ClientBuilder<T> instance() {
return new ClientBuilder<>(
null,
null
);
Expand Down
Loading