diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java index 2a5bb708c..a86896d66 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java @@ -33,7 +33,7 @@ import java.util.function.Function; import java.util.stream.Collectors; -public class ClientBuilder { +public class ClientBuilder { private static AtomicInteger counter = new AtomicInteger(0); private final String name; @@ -49,10 +49,10 @@ public class ClientBuilder { private final int retries; - private final ReactiveSocketConnector connector; + private final ReactiveSocketConnector connector; private final Function retryThisException; - private final Publisher> source; + private final Publisher> source; private ClientBuilder( String name, @@ -61,8 +61,8 @@ private ClientBuilder( long connectTimeout, TimeUnit connectTimeoutUnit, double backupQuantile, int retries, Function retryThisException, - ReactiveSocketConnector connector, - Publisher> source + ReactiveSocketConnector connector, + Publisher> source ) { this.name = name; this.executor = executor; @@ -77,8 +77,8 @@ private ClientBuilder( this.source = source; } - public ClientBuilder withRequestTimeout(long timeout, TimeUnit unit) { - return new ClientBuilder( + public ClientBuilder withRequestTimeout(long timeout, TimeUnit unit) { + return new ClientBuilder<>( name, executor, timeout, unit, @@ -90,8 +90,8 @@ public ClientBuilder withRequestTimeout(long timeout, TimeUnit unit) { ); } - public ClientBuilder withConnectTimeout(long timeout, TimeUnit unit) { - return new ClientBuilder( + public ClientBuilder withConnectTimeout(long timeout, TimeUnit unit) { + return new ClientBuilder<>( name, executor, requestTimeout, requestTimeoutUnit, @@ -103,8 +103,8 @@ public ClientBuilder withConnectTimeout(long timeout, TimeUnit unit) { ); } - public ClientBuilder withExecutor(ScheduledExecutorService executor) { - return new ClientBuilder( + public ClientBuilder withExecutor(ScheduledExecutorService executor) { + return new ClientBuilder<>( name, executor, requestTimeout, requestTimeoutUnit, @@ -116,8 +116,8 @@ public ClientBuilder withExecutor(ScheduledExecutorService executor) { ); } - public ClientBuilder withConnector(ReactiveSocketConnector connector) { - return new ClientBuilder( + public ClientBuilder withConnector(ReactiveSocketConnector connector) { + return new ClientBuilder<>( name, executor, requestTimeout, requestTimeoutUnit, @@ -129,8 +129,8 @@ public ClientBuilder withConnector(ReactiveSocketConnector connec ); } - public ClientBuilder withSource(Publisher> source) { - return new ClientBuilder( + public ClientBuilder withSource(Publisher> source) { + return new ClientBuilder<>( name, executor, requestTimeout, requestTimeoutUnit, @@ -150,52 +150,47 @@ public ReactiveSocket build() { throw new IllegalStateException("Please configure the connector!"); } - ReactiveSocketConnector filterConnector = connector + ReactiveSocketConnector filterConnector = connector .chain(socket -> new TimeoutSocket(socket, requestTimeout, requestTimeoutUnit, executor)) .chain(DrainingSocket::new); - Publisher>> factories = + Publisher>> factories = sourceToFactory(source, filterConnector); - return new LoadBalancer(factories); + return new LoadBalancer(factories); } - private Publisher>> sourceToFactory( - Publisher> source, - ReactiveSocketConnector connector + private Publisher>> sourceToFactory( + Publisher> source, + ReactiveSocketConnector connector ) { return subscriber -> - source.subscribe(new Subscriber>() { - private Map> current; + source.subscribe(new Subscriber>() { + private Map> current; @Override public void onSubscribe(Subscription s) { subscriber.onSubscribe(s); - current = new HashMap<>(); + current = Collections.emptyMap(); } @Override - public void onNext(List socketAddresses) { - socketAddresses.stream() - .filter(sa -> !current.containsKey(sa)) - .map(connector::toFactory) - .map(factory -> factory.chain(TimeoutFactory.asChainFunction(connectTimeout, connectTimeoutUnit, - executor))) - .map(FailureAwareFactory::new) - .forEach(factory -> current.put(factory.remote(), factory)); - - Set addresses = new HashSet<>(socketAddresses); - Iterator>> it = - current.entrySet().iterator(); - while (it.hasNext()) { - SocketAddress sa = it.next().getKey(); - if (! addresses.contains(sa)) { - it.remove(); + public void onNext(Collection socketAddresses) { + Map> next = new HashMap<>(socketAddresses.size()); + for (T sa: socketAddresses) { + ReactiveSocketFactory factory = current.get(sa); + if (factory == null) { + ReactiveSocketFactory newFactory = connector.toFactory(sa); + newFactory = new TimeoutFactory<>(newFactory, connectTimeout, connectTimeoutUnit, executor); + newFactory = new FailureAwareFactory<>(newFactory); + next.put(sa, newFactory); + } else { + next.put(sa, factory); } } - List> factories = - current.values().stream().collect(Collectors.toList()); + current = next; + List> factories = new ArrayList<>(current.values()); subscriber.onNext(factories); } @@ -207,17 +202,14 @@ public void onNext(List socketAddresses) { }); } - public static ClientBuilder instance() { - return new ClientBuilder( + public static ClientBuilder instance() { + return new ClientBuilder<>( "rs-loadbalancer-" + counter.incrementAndGet(), - Executors.newScheduledThreadPool(4, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r); - thread.setName("reactivesocket-scheduler-thread"); - thread.setDaemon(true); - return thread; - } + Executors.newScheduledThreadPool(4, runnable -> { + Thread thread = new Thread(runnable); + thread.setName("reactivesocket-scheduler-thread"); + thread.setDaemon(true); + return thread; }), 1, TimeUnit.SECONDS, 10, TimeUnit.SECONDS, diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java index 22e201d45..748f66c11 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java @@ -18,10 +18,12 @@ import io.reactivesocket.Payload; import io.reactivesocket.ReactiveSocket; import io.reactivesocket.ReactiveSocketFactory; -import io.reactivesocket.exceptions.TransportException; +import io.reactivesocket.client.stat.Median; import io.reactivesocket.client.util.Clock; import io.reactivesocket.client.exception.NoAvailableReactiveSocketException; import io.reactivesocket.client.stat.Ewma; +import io.reactivesocket.exceptions.TimeoutException; +import io.reactivesocket.exceptions.TransportException; import io.reactivesocket.rx.Completable; import io.reactivesocket.client.stat.FrugalQuantile; import io.reactivesocket.client.stat.Quantile; @@ -32,36 +34,46 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import java.util.stream.Collectors; /** * This {@link ReactiveSocket} implementation will load balance the request across a * pool of children ReactiveSockets. * It estimates the load of each ReactiveSocket based on statistics collected. */ -public class LoadBalancer implements ReactiveSocket { - private static Logger logger = LoggerFactory.getLogger(LoadBalancer .class); +public class LoadBalancer implements ReactiveSocket { + public static final double DEFAULT_EXP_FACTOR = 4.0; + public static final double DEFAULT_LOWER_QUANTILE = 0.2; + public static final double DEFAULT_HIGHER_QUANTILE = 0.8; + public static final double DEFAULT_MIN_PENDING = 1.0; + public static final double DEFAULT_MAX_PENDING = 2.0; + public static final int DEFAULT_MIN_APERTURE = 3; + public static final int DEFAULT_MAX_APERTURE = 100; + public static final long DEFAULT_MAX_REFRESH_PERIOD_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES); - private static final double MIN_PENDINGS = 1.0; - private static final double MAX_PENDINGS = 2.0; - private static final int MIN_APERTURE = 3; - private static final int MAX_APERTURE = 100; + private static Logger logger = LoggerFactory.getLogger(LoadBalancer .class); private static final long APERTURE_REFRESH_PERIOD = Clock.unit().convert(15, TimeUnit.SECONDS); - private static final long MAX_REFRESH_PERIOD = Clock.unit().convert(5, TimeUnit.MINUTES); private static final int EFFORT = 5; + private final double minPendings; + private final double maxPendings; + private final int minAperture; + private final int maxAperture; + private final long maxRefreshPeriod; + private final double expFactor; private final Quantile lowerQuantile; private final Quantile higherQuantile; private int pendingSockets; - private final Map activeSockets; - private final Map> activeFactories; + private final List activeSockets; + private final List> activeFactories; private final FactoriesRefresher factoryRefresher; private Ewma pendings; @@ -70,25 +82,72 @@ public class LoadBalancer implements ReactiveSocket { private long refreshPeriod; private volatile long lastRefresh; - public LoadBalancer(Publisher>> factories) { - this.expFactor = 4.0; - this.lowerQuantile = new FrugalQuantile(0.2); - this.higherQuantile = new FrugalQuantile(0.8); - - this.activeSockets = new HashMap<>(); - this.activeFactories = new HashMap<>(); + /** + * + * @param factories the source (factories) of ReactiveSocket + * @param expFactor how aggressive is the algorithm toward outliers. A higher + * number means we send aggressively less traffic to a server + * slightly slower. + * @param lowQuantile the lower bound of the latency band of acceptable values. + * Any server below that value will be aggressively favored. + * @param highQuantile the higher bound of the latency band of acceptable values. + * Any server above that value will be aggressively penalized. + * @param minPendings The lower band of the average outstanding messages per server. + * @param maxPendings The higher band of the average outstanding messages per server. + * @param minAperture the minimum number of connections we want to maintain, + * independently of the load. + * @param maxAperture the maximum number of connections we want to maintain, + * independently of the load. + * @param maxRefreshPeriodMs the maximum time between two "refreshes" of the list of active + * ReactiveSocket. This is at that time that the slowest + * ReactiveSocket is closed. (unit is millisecond) + */ + public LoadBalancer( + Publisher>> factories, + double expFactor, + double lowQuantile, + double highQuantile, + double minPendings, + double maxPendings, + int minAperture, + int maxAperture, + long maxRefreshPeriodMs + ) { + this.expFactor = expFactor; + this.lowerQuantile = new FrugalQuantile(lowQuantile); + this.higherQuantile = new FrugalQuantile(highQuantile); + + this.activeSockets = new ArrayList<>(128); + this.activeFactories = new ArrayList<>(128); this.pendingSockets = 0; this.factoryRefresher = new FactoriesRefresher(); - this.pendings = new Ewma(15, TimeUnit.SECONDS, (MIN_PENDINGS + MAX_PENDINGS) / 2); - this.targetAperture = MIN_APERTURE; - this.lastApertureRefresh = 0L; - this.refreshPeriod = Clock.unit().convert(15, TimeUnit.SECONDS); + this.minPendings = minPendings; + this.maxPendings = maxPendings; + this.pendings = new Ewma(15, TimeUnit.SECONDS, (minPendings + maxPendings) / 2.0); + + this.minAperture = minAperture; + this.maxAperture = maxAperture; + this.targetAperture = minAperture; + + this.maxRefreshPeriod = Clock.unit().convert(maxRefreshPeriodMs, TimeUnit.MILLISECONDS); + this.lastApertureRefresh = Clock.now(); + this.refreshPeriod = Clock.unit().convert(15L, TimeUnit.SECONDS); this.lastRefresh = Clock.now(); factories.subscribe(factoryRefresher); } + public LoadBalancer(Publisher>> factories) { + this(factories, + DEFAULT_EXP_FACTOR, + DEFAULT_LOWER_QUANTILE, DEFAULT_HIGHER_QUANTILE, + DEFAULT_MIN_PENDING, DEFAULT_MAX_PENDING, + DEFAULT_MIN_APERTURE, DEFAULT_MAX_APERTURE, + DEFAULT_MAX_REFRESH_PERIOD_MS + ); + } + @Override public Publisher fireAndForget(Payload payload) { return subscriber -> select().fireAndForget(payload).subscribe(subscriber); @@ -101,7 +160,6 @@ public Publisher requestResponse(Payload payload) { @Override public Publisher requestSubscription(Payload payload) { - // TODO: deal with subscription & cie return subscriber -> select().requestSubscription(payload).subscribe(subscriber); } @@ -121,18 +179,62 @@ public Publisher requestChannel(Publisher payloads) { } private synchronized void addSockets(int numberOfNewSocket) { - activeFactories.entrySet() - .stream() - // available factories that don't map to an already established socket - .filter(e -> !activeSockets.containsKey(e.getKey())) - .map(e -> e.getValue()) - .filter(factory -> factory.availability() > 0.0) - .sorted((a, b) -> -Double.compare(a.availability(), b.availability())) - .limit(numberOfNewSocket) - .forEach(factory -> { - pendingSockets += 1; - factory.apply().subscribe(new SocketAdder(factory.remote())); - }); + int n = numberOfNewSocket; + if (n > activeFactories.size()) { + n = activeFactories.size(); + logger.info("addSockets({}) restricted by the number of factories, i.e. addSockets({})", + numberOfNewSocket, n); + } + + Random rng = ThreadLocalRandom.current(); + while (n > 0) { + int size = activeFactories.size(); + if (size == 1) { + ReactiveSocketFactory factory = activeFactories.get(0); + if (factory.availability() > 0.0) { + activeFactories.remove(0); + pendingSockets++; + factory.apply().subscribe(new SocketAdder(factory)); + } + break; + } + ReactiveSocketFactory factory0 = null; + ReactiveSocketFactory factory1 = null; + int i0 = 0; + int i1 = 0; + for (int i = 0; i < EFFORT; i++) { + i0 = rng.nextInt(size); + i1 = rng.nextInt(size - 1); + if (i1 >= i0) { + i1++; + } + factory0 = activeFactories.get(i0); + factory1 = activeFactories.get(i1); + if (factory0.availability() > 0.0 && factory1.availability() > 0.0) + break; + } + + if (factory0.availability() < factory1.availability()) { + n--; + pendingSockets++; + // cheaper to permute activeFactories.get(i1) with the last item and remove the last + // rather than doing a activeFactories.remove(i1) + if (i1 < size - 1) { + activeFactories.set(i1, activeFactories.get(size - 1)); + } + activeFactories.remove(size - 1); + factory1.apply().subscribe(new SocketAdder(factory1)); + } else { + n--; + pendingSockets++; + // c.f. above + if (i0 < size - 1) { + activeFactories.set(i0, activeFactories.get(size - 1)); + } + activeFactories.remove(size - 1); + factory0.apply().subscribe(new SocketAdder(factory0)); + } + } } private synchronized void refreshAperture() { @@ -142,7 +244,7 @@ private synchronized void refreshAperture() { } double p = 0.0; - for (WeightedSocket wrs: activeSockets.values()) { + for (WeightedSocket wrs: activeSockets) { p += wrs.getPending(); } p /= (n + pendingSockets); @@ -151,24 +253,30 @@ private synchronized void refreshAperture() { long now = Clock.now(); boolean underRateLimit = now - lastApertureRefresh > APERTURE_REFRESH_PERIOD; - int previous = targetAperture; if (avgPending < 1.0 && underRateLimit) { - targetAperture--; - lastApertureRefresh = now; - pendings.reset((MIN_PENDINGS + MAX_PENDINGS)/2); + updateAperture(targetAperture - 1, now); } else if (2.0 < avgPending && underRateLimit) { - targetAperture++; - lastApertureRefresh = now; - pendings.reset((MIN_PENDINGS + MAX_PENDINGS)/2); + updateAperture(targetAperture + 1, now); } - targetAperture = Math.max(MIN_APERTURE, targetAperture); - int maxAperture = Math.min(MAX_APERTURE, activeFactories.size()); + } + + /** + * Update the aperture value and ensure its value stays in the right range. + * @param newValue new aperture value + * @param now time of the change (for rate limiting purposes) + */ + private void updateAperture(int newValue, long now) { + int previous = targetAperture; + targetAperture = newValue; + targetAperture = Math.max(minAperture, targetAperture); + int maxAperture = Math.min(this.maxAperture, activeSockets.size() + activeFactories.size()); targetAperture = Math.min(maxAperture, targetAperture); + lastApertureRefresh = now; + pendings.reset((minPendings + maxPendings)/2); if (targetAperture != previous) { - logger.info("Current pending=" + avgPending - + ", new target=" + targetAperture - + ", previous target=" + previous); + logger.debug("Current pending={}, new target={}, previous target={}", + pendings.value(), targetAperture, previous); } } @@ -183,14 +291,12 @@ private synchronized void refreshSockets() { int n = pendingSockets + activeSockets.size(); if (n < targetAperture) { - logger.info("aperture " + n - + " is below target " + targetAperture - + ", adding " + (targetAperture - n) + " sockets"); + logger.info("aperture {} is below target {}, adding {} sockets", + n, targetAperture, targetAperture - n); addSockets(targetAperture - n); } else if (targetAperture < n) { - logger.info("aperture " + n - + " is above target " + targetAperture - + ", quicking 1 socket"); + logger.info("aperture {} is above target {}, quicking 1 socket", + n, targetAperture); quickSlowestRS(); } @@ -199,8 +305,8 @@ private synchronized void refreshSockets() { return; } else { long prev = refreshPeriod; - refreshPeriod = (long) Math.min(refreshPeriod * 1.5, MAX_REFRESH_PERIOD); - logger.info("Bumping refresh period, " + (prev/1000) + "->" + (refreshPeriod/1000)); + refreshPeriod = (long) Math.min(refreshPeriod * 1.5, maxRefreshPeriod); + logger.info("Bumping refresh period, {}->{}", prev/1000, refreshPeriod/1000); } lastRefresh = now; addSockets(1); @@ -211,40 +317,48 @@ private synchronized void quickSlowestRS() { return; } - activeSockets.entrySet().forEach(e -> { - SocketAddress key = e.getKey(); - WeightedSocket value = e.getValue(); - logger.info("> " + key + " -> " + value); + activeSockets.forEach(value -> { + logger.info("> " + value); }); - activeSockets.entrySet() - .stream() - .sorted((a,b) -> { - WeightedSocket socket1 = a.getValue(); - WeightedSocket socket2 = b.getValue(); - double load1 = 1.0/socket1.getPredictedLatency() * socket1.availability(); - double load2 = 1.0/socket2.getPredictedLatency() * socket2.availability(); - return Double.compare(load1, load2); - }) - .limit(1) - .forEach(entry -> { - SocketAddress key = entry.getKey(); - WeightedSocket slowest = entry.getValue(); - try { - logger.info("quicking slowest: " + key + " -> " + slowest); - activeSockets.remove(key); - slowest.close(); - } catch (Exception e) { - logger.warn("Exception while closing a ReactiveSocket", e); - } - }); + WeightedSocket slowest = null; + double lowestAvailability = Double.MAX_VALUE; + for (WeightedSocket socket: activeSockets) { + double load = socket.availability(); + if (load == 0.0) { + slowest = socket; + break; + } + if (socket.getPredictedLatency() != 0) { + load *= 1.0 / socket.getPredictedLatency(); + } + if (load < lowestAvailability) { + lowestAvailability = load; + slowest = socket; + } + } + + if (slowest != null) { + removeSocket(slowest); + } + } + + private synchronized void removeSocket(WeightedSocket socket) { + try { + logger.debug("Removing socket: -> " + socket); + activeSockets.remove(socket); + activeFactories.add(socket.getFactory()); + socket.close(); + } catch (Exception e) { + logger.warn("Exception while closing a ReactiveSocket", e); + } } @Override public synchronized double availability() { double currentAvailability = 0.0; if (!activeSockets.isEmpty()) { - for (WeightedSocket rs : activeSockets.values()) { + for (WeightedSocket rs : activeSockets) { currentAvailability += rs.availability(); } currentAvailability /= activeSockets.size(); @@ -275,7 +389,7 @@ public void onShutdown(Completable c) { @Override public synchronized void sendLease(int ttl, int numberOfRequests) { - activeSockets.values().forEach(socket -> + activeSockets.forEach(socket -> socket.sendLease(ttl, numberOfRequests) ); } @@ -296,9 +410,8 @@ private synchronized ReactiveSocket select() { refreshSockets(); int size = activeSockets.size(); - List buffer = activeSockets.values().stream().collect(Collectors.toList()); if (size == 1) { - return buffer.get(0); + return activeSockets.get(0); } WeightedSocket rsc1 = null; @@ -311,8 +424,8 @@ private synchronized ReactiveSocket select() { if (i2 >= i1) { i2++; } - rsc1 = buffer.get(i1); - rsc2 = buffer.get(i2); + rsc1 = activeSockets.get(i1); + rsc2 = activeSockets.get(i2); if (rsc1.availability() > 0.0 && rsc2.availability() > 0.0) break; } @@ -367,7 +480,7 @@ public synchronized void close() throws Exception { // TODO: have a `closed` flag? factoryRefresher.close(); activeFactories.clear(); - activeSockets.values().forEach(rs -> { + activeSockets.forEach(rs -> { try { rs.close(); } catch (Exception e) { @@ -376,47 +489,11 @@ public synchronized void close() throws Exception { }); } - private class RemoveItselfSubscriber implements Subscriber { - private Subscriber child; - private SocketAddress key; - - private RemoveItselfSubscriber(Subscriber child, SocketAddress key) { - this.child = child; - this.key = key; - } - - @Override - public void onSubscribe(Subscription s) { - child.onSubscribe(s); - } - - @Override - public void onNext(Payload payload) { - child.onNext(payload); - } - - @Override - public void onError(Throwable t) { - child.onError(t); - if (t instanceof TransportException) { - System.out.println(t + " removing socket " + child); - synchronized (LoadBalancer.this) { - activeSockets.remove(key); - } - } - } - - @Override - public void onComplete() { - child.onComplete(); - } - } - /** * This subscriber role is to subscribe to the list of server identifier, and update the * factory list. */ - private class FactoriesRefresher implements Subscriber>> { + private class FactoriesRefresher implements Subscriber>> { private Subscription subscription; @Override @@ -426,37 +503,58 @@ public void onSubscribe(Subscription subscription) { } @Override - public void onNext(List> newFactories) { - List> removed = computeRemoved(newFactories); + public void onNext(Collection> newFactories) { synchronized (LoadBalancer.this) { + + Set> current = + new HashSet<>(activeFactories.size() + activeSockets.size()); + current.addAll(activeFactories); + for (WeightedSocket socket: activeSockets) { + ReactiveSocketFactory factory = socket.getFactory(); + current.add(factory); + } + + Set> removed = new HashSet<>(current); + removed.removeAll(newFactories); + + Set> added = new HashSet<>(newFactories); + added.removeAll(current); + boolean changed = false; - for (ReactiveSocketFactory factory : removed) { - SocketAddress key = factory.remote(); - activeFactories.remove(key); - WeightedSocket removedSocket = activeSockets.remove(key); - try { - if (removedSocket != null) { + Iterator it0 = activeSockets.iterator(); + while (it0.hasNext()) { + WeightedSocket socket = it0.next(); + if (removed.contains(socket.getFactory())) { + it0.remove(); + try { changed = true; - removedSocket.close(); + socket.close(); + } catch (Exception e) { + logger.warn("Exception while closing a ReactiveSocket", e); } - } catch (Exception e) { - logger.warn("Exception while closing a ReactiveSocket", e); } } - - for (ReactiveSocketFactory factory : newFactories) { - if (!activeFactories.containsKey(factory.remote())) { - activeFactories.put(factory.remote(), factory); + Iterator> it1 = activeFactories.iterator(); + while (it1.hasNext()) { + ReactiveSocketFactory factory = it1.next(); + if (removed.contains(factory)) { + it1.remove(); changed = true; } } - if (changed && logger.isInfoEnabled()) { - String msg = "UPDATING ACTIVE FACTORIES"; - for (Map.Entry> e : activeFactories.entrySet()) { - msg += " + " + e.getKey() + ": " + e.getValue() + "\n"; + activeFactories.addAll(added); + + if (changed && logger.isDebugEnabled()) { + String msg = "\nUpdated active factories (size: " + activeFactories.size() + ")\n"; + for (ReactiveSocketFactory f : activeFactories) { + msg += " + " + f + "\n"; + } + msg += "Active sockets:\n"; + for (WeightedSocket socket: activeSockets) { + msg += " + " + socket + "\n"; } - logger.info(msg); + logger.debug(msg); } } refreshSockets(); @@ -475,37 +573,13 @@ public void onComplete() { void close() { subscription.cancel(); } - - private List> computeRemoved( - List> newFactories) { - ArrayList> removed = new ArrayList<>(); - - synchronized (LoadBalancer.this) { - for (Map.Entry> e : activeFactories.entrySet()) { - SocketAddress key = e.getKey(); - ReactiveSocketFactory factory = e.getValue(); - - boolean isRemoved = true; - for (ReactiveSocketFactory f : newFactories) { - if (f.remote() == key) { - isRemoved = false; - break; - } - } - if (isRemoved) { - removed.add(factory); - } - } - } - return removed; - } } private class SocketAdder implements Subscriber { - private final SocketAddress remote; + private final ReactiveSocketFactory factory; - private SocketAdder(SocketAddress remote) { - this.remote = remote; + private SocketAdder(ReactiveSocketFactory factory) { + this.factory = factory; } @Override @@ -520,12 +594,11 @@ public void onNext(ReactiveSocket rs) { quickSlowestRS(); } - ReactiveSocket proxy = new ReactiveSocketProxy(rs, - s -> new RemoveItselfSubscriber(s, remote)); - WeightedSocket weightedSocket = new WeightedSocket(proxy, lowerQuantile, higherQuantile); + WeightedSocket weightedSocket = new WeightedSocket(rs, factory, lowerQuantile, higherQuantile); logger.info("Adding new WeightedSocket " - + weightedSocket + " connected to " + remote); - activeSockets.put(remote, weightedSocket); + + weightedSocket + " connected to " + factory.remote()); + + activeSockets.add(weightedSocket); pendingSockets -= 1; } } @@ -535,6 +608,7 @@ public void onError(Throwable t) { logger.warn("Exception while subscribing to the ReactiveSocket source", t); synchronized (LoadBalancer.this) { pendingSockets -= 1; + activeFactories.add(factory); } } @@ -618,4 +692,286 @@ public void shutdown() {} @Override public void close() throws Exception {} } + + /** + * Wrapper of a ReactiveSocket, it computes statistics about the req/resp calls and + * update availability accordingly. + */ + private class WeightedSocket extends ReactiveSocketProxy { + private static final double STARTUP_PENALTY = Long.MAX_VALUE >> 12; + + private final ReactiveSocket child; + private ReactiveSocketFactory factory; + private final Quantile lowerQuantile; + private final Quantile higherQuantile; + private final long inactivityFactor; + + private volatile int pending; // instantaneous rate + private long stamp; // last timestamp we sent a request + private long stamp0; // last timestamp we sent a request or receive a response + private long duration; // instantaneous cumulative duration + + private Median median; + private Ewma interArrivalTime; + + private AtomicLong pendingStreams; // number of active streams + + WeightedSocket( + ReactiveSocket child, + ReactiveSocketFactory factory, + Quantile lowerQuantile, + Quantile higherQuantile, + int inactivityFactor + ) { + super(child); + this.child = child; + this.factory = factory; + this.lowerQuantile = lowerQuantile; + this.higherQuantile = higherQuantile; + this.inactivityFactor = inactivityFactor; + long now = Clock.now(); + this.stamp = now; + this.stamp0 = now; + this.duration = 0L; + this.pending = 0; + this.median = new Median(); + this.interArrivalTime = new Ewma(1, TimeUnit.MINUTES, 1000); + this.pendingStreams = new AtomicLong(); + } + + WeightedSocket( + ReactiveSocket child, + ReactiveSocketFactory factory, + Quantile lowerQuantile, + Quantile higherQuantile + ) { + this(child, factory, lowerQuantile, higherQuantile, 100); + } + + @Override + public Publisher requestResponse(Payload payload) { + return subscriber -> + child.requestResponse(payload).subscribe(new LatencySubscriber<>(subscriber, this)); + } + + @Override + public Publisher requestStream(Payload payload) { + return subscriber -> + child.requestStream(payload).subscribe(new CountingSubscriber<>(subscriber, this)); + } + + @Override + public Publisher requestSubscription(Payload payload) { + return subscriber -> + child.requestSubscription(payload).subscribe(new CountingSubscriber<>(subscriber, this)); + } + + @Override + public Publisher fireAndForget(Payload payload) { + return subscriber -> + child.fireAndForget(payload).subscribe(new CountingSubscriber<>(subscriber, this)); + } + + @Override + public Publisher metadataPush(Payload payload) { + return subscriber -> + child.metadataPush(payload).subscribe(new CountingSubscriber<>(subscriber, this)); + } + + @Override + public Publisher requestChannel(Publisher payloads) { + return subscriber -> + child.requestChannel(payloads).subscribe(new CountingSubscriber<>(subscriber, this)); + } + + ReactiveSocketFactory getFactory() { + return factory; + } + + synchronized double getPredictedLatency() { + long now = Clock.now(); + long elapsed = Math.max(now - stamp, 1L); + + double weight; + double prediction = median.estimation(); + + if (prediction == 0.0) { + if (pending == 0) { + weight = 0.0; // first request + } else { + // subsequent requests while we don't have any history + weight = STARTUP_PENALTY + pending; + } + } else if (pending == 0 && elapsed > inactivityFactor * interArrivalTime.value()) { + // if we did't see any data for a while, we decay the prediction by inserting + // artificial 0.0 into the median + median.insert(0.0); + weight = median.estimation(); + } else { + double predicted = prediction * pending; + double instant = instantaneous(now); + + if (predicted < instant) { // NB: (0.0 < 0.0) == false + weight = instant / pending; // NB: pending never equal 0 here + } else { + // we are under the predictions + weight = prediction; + } + } + + return weight; + } + + int getPending() { + return pending; + } + + private synchronized long instantaneous(long now) { + return duration + (now - stamp0) * pending; + } + + private synchronized long incr() { + long now = Clock.now(); + interArrivalTime.insert(now - stamp); + duration += Math.max(0, now - stamp0) * pending; + pending += 1; + stamp = now; + stamp0 = now; + return now; + } + + private synchronized long decr(long timestamp) { + long now = Clock.now(); + duration += Math.max(0, now - stamp0) * pending - (now - timestamp); + pending -= 1; + stamp0 = now; + return now; + } + + private synchronized void observe(double rtt) { + median.insert(rtt); + lowerQuantile.insert(rtt); + higherQuantile.insert(rtt); + } + + @Override + public void close() throws Exception { + child.close(); + } + + @Override + public String toString() { + return "WeightedSocket@" + hashCode() + + " [median:" + median.estimation() + + " quantile-low:" + lowerQuantile.estimation() + + " quantile-high:" + higherQuantile.estimation() + + " inter-arrival:" + interArrivalTime.value() + + " duration/pending:" + (pending == 0 ? 0 : (double)duration / pending) + + " availability: " + availability() + + "]->" + child.toString(); + } + + /** + * Subscriber wrapper used for request/response interaction model, measure and collect + * latency information. + */ + private class LatencySubscriber implements Subscriber { + private final Subscriber child; + private final WeightedSocket socket; + private final AtomicBoolean done; + private long start; + + LatencySubscriber(Subscriber child, WeightedSocket socket) { + this.child = child; + this.socket = socket; + this.done = new AtomicBoolean(false); + } + + @Override + public void onSubscribe(Subscription s) { + start = incr(); + child.onSubscribe(new Subscription() { + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + if (done.compareAndSet(false, true)) { + s.cancel(); + decr(start); + } + } + }); + } + + @Override + public void onNext(U u) { + child.onNext(u); + } + + @Override + public void onError(Throwable t) { + if (done.compareAndSet(false, true)) { + child.onError(t); + long now = decr(start); + if (t instanceof TransportException || t instanceof ClosedChannelException) { + removeSocket(socket); + } else if (t instanceof TimeoutException) { + observe(now - start); + } + } + } + + @Override + public void onComplete() { + if (done.compareAndSet(false, true)) { + long now = decr(start); + observe(now - start); + child.onComplete(); + } + } + } + + /** + * Subscriber wrapper used for stream like interaction model, it only counts the number of + * active streams + */ + private class CountingSubscriber implements Subscriber { + private final Subscriber child; + private final WeightedSocket socket; + + CountingSubscriber(Subscriber child, WeightedSocket socket) { + this.child = child; + this.socket = socket; + } + + @Override + public void onSubscribe(Subscription s) { + socket.pendingStreams.incrementAndGet(); + child.onSubscribe(s); + } + + @Override + public void onNext(U u) { + child.onNext(u); + } + + @Override + public void onError(Throwable t) { + socket.pendingStreams.decrementAndGet(); + child.onError(t); + if (t instanceof TransportException || t instanceof ClosedChannelException) { + removeSocket(socket); + } + } + + @Override + public void onComplete() { + socket.pendingStreams.decrementAndGet(); + child.onComplete(); + } + } + } } diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/WeightedSocket.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/WeightedSocket.java deleted file mode 100644 index a05340f86..000000000 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/WeightedSocket.java +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.client; - -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.client.util.Clock; -import io.reactivesocket.client.stat.Ewma; -import io.reactivesocket.client.stat.Median; -import io.reactivesocket.client.stat.Quantile; -import io.reactivesocket.util.ReactiveSocketProxy; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Wrapper of a ReactiveSocket, it computes statistics about the req/resp calls and - * update availability accordingly. - */ -public class WeightedSocket extends ReactiveSocketProxy { - private static final double STARTUP_PENALTY = Long.MAX_VALUE >> 12; - - private final ReactiveSocket child; - private final Quantile lowerQuantile; - private final Quantile higherQuantile; - private final long inactivityFactor; - - private volatile int pending; // instantaneous rate - private long stamp; // last timestamp we sent a request - private long stamp0; // last timestamp we sent a request or receive a response - private long duration; // instantaneous cumulative duration - - private Median median; - private Ewma interArrivalTime; - - private AtomicLong pendingStreams; // number of active streams - - public WeightedSocket(ReactiveSocket child, Quantile lowerQuantile, Quantile higherQuantile, int inactivityFactor) { - super(child); - this.child = child; - this.lowerQuantile = lowerQuantile; - this.higherQuantile = higherQuantile; - this.inactivityFactor = inactivityFactor; - long now = Clock.now(); - this.stamp = now; - this.stamp0 = now; - this.duration = 0L; - this.pending = 0; - this.median = new Median(); - this.interArrivalTime = new Ewma(1, TimeUnit.MINUTES, 1000); - this.pendingStreams = new AtomicLong(); - } - - public WeightedSocket(ReactiveSocket child, Quantile lowerQuantile, Quantile higherQuantile) { - this(child, lowerQuantile, higherQuantile, 100); - } - - @Override - public Publisher requestResponse(Payload payload) { - return subscriber -> - child.requestResponse(payload).subscribe(new LatencySubscriber<>(subscriber)); - } - - @Override - public Publisher requestStream(Payload payload) { - return subscriber -> - child.requestStream(payload).subscribe(new CountingSubscriber<>(subscriber)); - } - - @Override - public Publisher requestSubscription(Payload payload) { - return subscriber -> - child.requestSubscription(payload).subscribe(new CountingSubscriber<>(subscriber)); - } - - @Override - public Publisher fireAndForget(Payload payload) { - return subscriber -> - child.fireAndForget(payload).subscribe(new CountingSubscriber<>(subscriber)); - } - - @Override - public Publisher metadataPush(Payload payload) { - return subscriber -> - child.metadataPush(payload).subscribe(new CountingSubscriber<>(subscriber)); - } - - @Override - public Publisher requestChannel(Publisher payloads) { - return subscriber -> - child.requestChannel(payloads).subscribe(new CountingSubscriber<>(subscriber)); - } - - public synchronized double getPredictedLatency() { - long now = Clock.now(); - long elapsed = Math.max(now - stamp, 1L); - - double weight; - double prediction = median.estimation(); - - if (prediction == 0.0) { - if (pending == 0) { - weight = 0.0; // first request - } else { - // subsequent requests while we don't have any history - weight = STARTUP_PENALTY + pending; - } - } else if (pending == 0 && elapsed > inactivityFactor * interArrivalTime.value()) { - // if we did't see any data for a while, we decay the prediction by inserting - // artificial 0.0 into the median - median.insert(0.0); - weight = median.estimation(); - } else { - double predicted = prediction * pending; - double instant = instantaneous(now); - - if (predicted < instant) { // NB: (0.0 < 0.0) == false - weight = instant / pending; // NB: pending never equal 0 here - } else { - // we are under the predictions - weight = prediction; - } - } - - return weight; - } - - public int getPending() { - return pending; - } - - private synchronized long instantaneous(long now) { - return duration + (now - stamp0) * pending; - } - - private synchronized long incr() { - long now = Clock.now(); - interArrivalTime.insert(now - stamp); - duration += Math.max(0, now - stamp0) * pending; - pending += 1; - stamp = now; - stamp0 = now; - return now; - } - - private synchronized long decr(long timestamp) { - long now = Clock.now(); - duration += Math.max(0, now - stamp0) * pending - (now - timestamp); - pending -= 1; - stamp0 = now; - return now; - } - - private synchronized void observe(double rtt) { - median.insert(rtt); - lowerQuantile.insert(rtt); - higherQuantile.insert(rtt); - } - - @Override - public void close() throws Exception { - child.close(); - } - - @Override - public String toString() { - return "WeightedSocket@" + hashCode() - + " [median:" + median.estimation() - + " quantile-low:" + lowerQuantile.estimation() - + " quantile-high:" + higherQuantile.estimation() - + " inter-arrival:" + interArrivalTime.value() - + " duration/pending:" + (pending == 0 ? 0 : (double)duration / pending) - + " availability: " + availability() - + "]->" + child.toString(); - } - - /** - * Subscriber wrapper used for request/response interaction model, measure and collect - * latency information. - */ - private class LatencySubscriber implements Subscriber { - private final Subscriber child; - private long start; - - LatencySubscriber(Subscriber child) { - this.child = child; - } - - @Override - public void onSubscribe(Subscription s) { - child.onSubscribe(s); - start = incr(); - } - - @Override - public void onNext(T t) { - child.onNext(t); - } - - @Override - public void onError(Throwable t) { - child.onError(t); - decr(start); - } - - @Override - public void onComplete() { - long now = decr(start); - observe(now - start); - child.onComplete(); - } - } - - /** - * Subscriber wrapper used for stream like interaction model, it only counts the number of - * active streams - */ - private class CountingSubscriber implements Subscriber { - private final Subscriber child; - - CountingSubscriber(Subscriber child) { - this.child = child; - } - - @Override - public void onSubscribe(Subscription s) { - pendingStreams.incrementAndGet(); - child.onSubscribe(s); - } - - @Override - public void onNext(T t) { - child.onNext(t); - } - - @Override - public void onError(Throwable t) { - pendingStreams.decrementAndGet(); - child.onError(t); - } - - @Override - public void onComplete() { - pendingStreams.decrementAndGet(); - child.onComplete(); - } - } -} diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/FailureAwareFactory.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/FailureAwareFactory.java index a275faa87..6b7858488 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/FailureAwareFactory.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/FailureAwareFactory.java @@ -140,11 +140,7 @@ public void onNext(U u) { @Override public void onError(Throwable t) { - if (t instanceof TransportException) { - errorPercentage.reset(0.0); - } else { - errorPercentage.insert(0.0); - } + errorPercentage.insert(0.0); child.onError(t); } diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/LoadBalancerTest.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/LoadBalancerTest.java index b220b9f2e..d36bbf15f 100644 --- a/reactivesocket-client/src/test/java/io/reactivesocket/client/LoadBalancerTest.java +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/LoadBalancerTest.java @@ -39,7 +39,7 @@ public void testNeverSelectFailingFactories() throws InterruptedException { TestingReactiveSocket socket = new TestingReactiveSocket(Function.identity()); ReactiveSocketFactory failing = failingFactory(local0); ReactiveSocketFactory succeeding = succeedingFactory(local1, socket); - List> factories = Arrays.asList(failing, succeeding); + List> factories = Arrays.asList(failing, succeeding); testBalancer(factories); } @@ -64,13 +64,13 @@ public double availability() { ReactiveSocketFactory failing = succeedingFactory(local0, failingSocket); ReactiveSocketFactory succeeding = succeedingFactory(local1, socket); - List> factories = Arrays.asList(failing, succeeding); + List> factories = Arrays.asList(failing, succeeding); testBalancer(factories); } - private void testBalancer(List> factories) throws InterruptedException { - Publisher>> src = s -> { + private void testBalancer(List> factories) throws InterruptedException { + Publisher>> src = s -> { s.onNext(factories); s.onComplete(); }; diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java index 5b6fbce2a..64de92e84 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java @@ -257,6 +257,7 @@ public void onNext(T t) { public void onError(Throwable t) { if (done.compareAndSet(false, true)) { doOnError(t); + super.cancel(); } } diff --git a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java deleted file mode 100644 index 9d53cdc12..000000000 --- a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/EchoClient.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.examples; - -import io.reactivesocket.ConnectionSetupHandler; -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.client.ClientBuilder; -import io.reactivesocket.test.TestUtil; -import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; -import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; -import io.reactivesocket.util.Unsafe; -import rx.Observable; -import rx.RxReactiveStreams; - -import java.net.SocketAddress; -import java.util.Collections; - -public final class EchoClient { - - public static void main(String... args) throws Exception { - - ConnectionSetupHandler setupHandler = (setupPayload, reactiveSocket) -> { - return new RequestHandler.Builder() - .withRequestResponse( - payload -> RxReactiveStreams.toPublisher(Observable.just(payload))) - .build(); - }; - - SocketAddress serverAddress = TcpReactiveSocketServer.create() - .start(setupHandler) - .getServerAddress(); - - ConnectionSetupPayload setupPayload = - ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); - - TcpReactiveSocketConnector tcp = TcpReactiveSocketConnector.create(setupPayload, Throwable::printStackTrace); - - ReactiveSocket client = ClientBuilder.instance() - .withSource(RxReactiveStreams.toPublisher(Observable.just(Collections.singletonList(serverAddress)))) - .withConnector(tcp) - .build(); - - Unsafe.awaitAvailability(client); - - Payload request = TestUtil.utf8EncodedPayload("Hello", "META"); - RxReactiveStreams.toObservable(client.requestResponse(request)) - .map(TestUtil::dataAsString) - .toBlocking() - .forEach(System.out::println); - } -} diff --git a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java new file mode 100644 index 000000000..15a5efade --- /dev/null +++ b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java @@ -0,0 +1,209 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.examples; + +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.RequestHandler; +import io.reactivesocket.client.ClientBuilder; +import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; +import io.reactivesocket.util.Unsafe; +import io.reactivesocket.test.TestUtil; +import org.HdrHistogram.ConcurrentHistogram; +import org.HdrHistogram.Histogram; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; +import rx.Observable; +import rx.RxReactiveStreams; +import rx.functions.Func1; + +public class StressTest { + private static AtomicInteger count = new AtomicInteger(0); + + private static SocketAddress startServer() throws InterruptedException { + // 25% of bad servers + boolean bad = count.incrementAndGet() % 4 == 3; + + ConnectionSetupHandler setupHandler = (setupPayload, reactiveSocket) -> + new RequestHandler.Builder() + .withRequestResponse( + payload -> + subscriber -> { + Subscription subscription = new Subscription() { + @Override + public void request(long n) { + if (bad) { + if (ThreadLocalRandom.current().nextInt(2) == 0) { + subscriber.onError(new Exception("SERVER EXCEPTION")); + } else { + // This will generate a timeout + //System.out.println("Server: No response"); + } + } else { + subscriber.onNext(TestUtil.utf8EncodedPayload("RESPONSE", "NO_META")); + subscriber.onComplete(); + } + } + + @Override + public void cancel() {} + }; + subscriber.onSubscribe(subscription); + } + ) + .build(); + + SocketAddress addr = new InetSocketAddress("127.0.0.1", 0); + TcpReactiveSocketServer.StartedServer server = TcpReactiveSocketServer.create(addr).start(setupHandler); + SocketAddress serverAddress = server.getServerAddress(); + return serverAddress; + } + + private static Publisher> getServersList() { + Observable> serverAddresses = Observable.interval(2, TimeUnit.SECONDS) + .map(new Func1>() { + List addresses = new ArrayList<>(); + + @Override + public List call(Long aLong) { + try { + SocketAddress socketAddress = startServer(); + System.out.println("Adding server " + socketAddress); + addresses.add(socketAddress); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (addresses.size() > 15) { + SocketAddress address = addresses.get(0); + System.out.println("Removing server " + address); + addresses.remove(address); + } + return new ArrayList<>(addresses); + } + }); + return RxReactiveStreams.toPublisher(serverAddresses); + } + + public static void main(String... args) throws Exception { + ConnectionSetupPayload setupPayload = + ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.HONOR_LEASE); + + TcpReactiveSocketConnector tcp = TcpReactiveSocketConnector.create(setupPayload, Throwable::printStackTrace); + + ReactiveSocket client = ClientBuilder.instance() + .withSource(getServersList()) + .withConnector(tcp) + .withConnectTimeout(1, TimeUnit.SECONDS) + .withRequestTimeout(1, TimeUnit.SECONDS) + .build(); + + Unsafe.awaitAvailability(client); + System.out.println("Client ready, starting the load..."); + + long testDurationNs = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS); + AtomicInteger successes = new AtomicInteger(0); + AtomicInteger failures = new AtomicInteger(0); + + long start = System.nanoTime(); + ConcurrentHistogram histogram = new ConcurrentHistogram(TimeUnit.MINUTES.toNanos(1), 4); + histogram.setAutoResize(true); + + int concurrency = 100; + AtomicInteger outstandings = new AtomicInteger(0); + while (System.nanoTime() - start < testDurationNs) { + if (outstandings.get() <= concurrency) { + Payload request = TestUtil.utf8EncodedPayload("Hello", "META"); + client.requestResponse(request).subscribe(new MeasurerSusbcriber<>(histogram, successes, failures, outstandings)); + } else { + Thread.sleep(1); + } + } + + Thread.sleep(1000); + System.out.println(successes.get() + " events in " + (System.nanoTime() - start) / 1_000_000 + " ms"); + double rps = (1_000_000_000.0 * successes.get())/(System.nanoTime() - start); + System.out.println(rps + " rps"); + double rate = ((double) successes.get()) / (successes.get() + failures.get()); + System.out.println("successes: " + successes.get() + + ", failures: " + failures.get() + + ", success rate: " + rate); + System.out.println("Latency distribution in us"); + histogram.outputPercentileDistribution(System.out, 1000.0); + System.out.flush(); + } + + private static class MeasurerSusbcriber implements Subscriber { + private final Histogram histo; + private final AtomicInteger successes; + private final AtomicInteger failures; + private AtomicInteger outstandings; + private long start; + + private MeasurerSusbcriber( + Histogram histo, + AtomicInteger successes, + AtomicInteger failures, + AtomicInteger outstandings + ) { + this.histo = histo; + this.successes = successes; + this.failures = failures; + this.outstandings = outstandings; + } + + @Override + public void onSubscribe(Subscription s) { + start = System.nanoTime(); + outstandings.incrementAndGet(); + s.request(1L); + } + + @Override + public void onNext(T t) {} + + @Override + public void onError(Throwable t) { + record(); + System.err.println("Error: " + t); + failures.incrementAndGet(); + } + + @Override + public void onComplete() { + record(); + successes.incrementAndGet(); + } + + private void record() { + long elapsed = (System.nanoTime() - start) / 1000; + histo.recordValue(elapsed); + outstandings.decrementAndGet(); + } + } +} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java index fd5bdf14e..53e9cb298 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java @@ -21,7 +21,6 @@ import io.reactivesocket.internal.rx.BooleanDisposable; import io.reactivesocket.rx.Completable; import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; import io.reactivex.netty.channel.Connection; import org.reactivestreams.Publisher; import rx.RxReactiveStreams; @@ -41,40 +40,32 @@ public TcpDuplexConnection(Connection connection) { @Override public final Observable getInput() { - return new Observable() { - @Override - public void subscribe(Observer o) { - Subscriber subscriber = new ObserverSubscriber(o); - o.onSubscribe(new BooleanDisposable(new Runnable() { - @Override - public void run() { - subscriber.unsubscribe(); - } - })); - input.unsafeSubscribe(subscriber); - } + return o -> { + Subscriber subscriber = new ObserverSubscriber(o); + o.onSubscribe(new BooleanDisposable(subscriber::unsubscribe)); + input.unsafeSubscribe(subscriber); }; } @Override public void addOutput(Publisher o, Completable callback) { connection.writeAndFlushOnEach(RxReactiveStreams.toObservable(o)) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - callback.success(); - } + .subscribe(new Subscriber() { + @Override + public void onCompleted() { + callback.success(); + } - @Override - public void onError(Throwable e) { - callback.error(e); - } + @Override + public void onError(Throwable e) { + callback.error(e); + } - @Override - public void onNext(Void aVoid) { - // No Op. - } - }); + @Override + public void onNext(Void aVoid) { + // No Op. + } + }); } @Override