From 957a3203ac5a183bed2663d18a69fa4626287d79 Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Tue, 12 Jul 2016 14:38:52 -0700 Subject: [PATCH 1/3] `ClientBuilder.build` is now asynchronous. ***Problem*** The ClientBuilder `build` API return a ReactiveSocket. This ReactiveSocket is returned while connections are still in establishing mode, then it will fail until the connections established. The current solution is to call `Unsafe.awaitAvailability` which is cluncky. ***Solution*** Create a more robust API that return a `Publisher`, it is now obvious that the API is asynchronous. ***Modification*** I found a bug in `sourceToFactory`, where I was calling `subscriber.onSubscribe` before initializing the `current` Map. --- .../reactivesocket/client/ClientBuilder.java | 67 ++++++++---- .../client/ClientBuilderTest.java | 100 ++++++++++++++++++ .../reactivesocket/examples/StressTest.java | 4 +- 3 files changed, 151 insertions(+), 20 deletions(-) create mode 100644 reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java index 563aba477..4f20d84c6 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java @@ -23,7 +23,9 @@ import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class ClientBuilder { private final ScheduledExecutorService executor; @@ -104,26 +106,55 @@ public ClientBuilder withSource(Publisher> source) { ); } - public ReactiveSocket build() { - if (source == null) { - throw new IllegalStateException("Please configure the source!"); - } - if (connector == null) { - throw new IllegalStateException("Please configure the connector!"); - } + public Publisher build() { + return subscriber -> { + subscriber.onSubscribe(new Subscription() { + private ScheduledFuture scheduledFuture = null; + private AtomicBoolean cancelled = new AtomicBoolean(false); + @Override + public void request(long n) { + if (source == null) { + subscriber.onError(new IllegalStateException("Please configure the source!")); + return; + } + if (connector == null) { + subscriber.onError(new IllegalStateException("Please configure the connector!")); + return; + } - ReactiveSocketConnector filterConnector = connector; - if (requestTimeout > 0) { - filterConnector = filterConnector - .chain(socket -> new TimeoutSocket(socket, requestTimeout, requestTimeoutUnit, executor)); - } - filterConnector = filterConnector.chain(DrainingSocket::new); - - Publisher>> factories = - sourceToFactory(source, filterConnector); + ReactiveSocketConnector filterConnector = connector; + if (requestTimeout > 0) { + filterConnector = filterConnector + .chain(socket -> new TimeoutSocket(socket, requestTimeout, requestTimeoutUnit, executor)); + } + filterConnector = filterConnector.chain(DrainingSocket::new); + + Publisher>> factories = + sourceToFactory(source, filterConnector); + LoadBalancer loadBalancer = new LoadBalancer<>(factories); + + scheduledFuture = executor.scheduleAtFixedRate(() -> { + if (loadBalancer.availability() > 0 && !cancelled.get()) { + subscriber.onNext(loadBalancer); + subscriber.onComplete(); + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + } + }, 1L, 50L, TimeUnit.MILLISECONDS); + } - return new LoadBalancer<>(factories); + @Override + public void cancel() { + if (cancelled.compareAndSet(false, true)) { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + } + } + }); + }; } private Publisher>> sourceToFactory( @@ -136,8 +167,8 @@ private Publisher>> sourceToFactor @Override public void onSubscribe(Subscription s) { - subscriber.onSubscribe(s); current = Collections.emptyMap(); + subscriber.onSubscribe(s); } @Override diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java new file mode 100644 index 000000000..958396fdb --- /dev/null +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java @@ -0,0 +1,100 @@ +package io.reactivesocket.client; + +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketConnector; +import io.reactivesocket.internal.Publishers; +import org.hamcrest.MatcherAssert; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import static org.hamcrest.Matchers.instanceOf; + +public class ClientBuilderTest { + + @Test(timeout = 10_000L) + public void testIllegalState() throws ExecutionException, InterruptedException { + // you need to specify the source and the connector + Publisher socketPublisher = ClientBuilder.instance().build(); + + CountDownLatch latch = new CountDownLatch(1); + socketPublisher.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(1L); + } + + @Override + public void onNext(ReactiveSocket reactiveSocket) { + throw new AssertionError("onNext invoked when not expected."); + } + + @Override + public void onError(Throwable t) { + MatcherAssert.assertThat("Unexpected exception in onError", t, instanceOf(IllegalStateException.class)); + latch.countDown(); + } + + @Override + public void onComplete() { + throw new AssertionError("onComplete invoked when not expected."); + } + }); + + latch.await(); + } + + @Test(timeout = 10_000L) + public void testReturnedRSisAvailable() throws ExecutionException, InterruptedException { + + List addrs = Collections.singletonList( + InetSocketAddress.createUnresolved("localhost", 8080)); + Publisher> src = Publishers.just(addrs); + + ReactiveSocketConnector connector = + address -> Publishers.just(new TestingReactiveSocket(Function.identity())); + + Publisher socketPublisher = + ClientBuilder.instance() + .withSource(src) + .withConnector(connector) + .build(); + + CountDownLatch latch = new CountDownLatch(1); + socketPublisher.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(1L); + } + + @Override + public void onNext(ReactiveSocket reactiveSocket) { + // the returned ReactiveSocket must have an availability > 0.0 + if (reactiveSocket.availability() == 0.0) { + throw new AssertionError("Loadbalancer availability is zero!"); + } + } + + @Override + public void onError(Throwable t) { + throw new AssertionError("onError invoked when not expected."); + } + + @Override + public void onComplete() { + latch.countDown(); + } + }); + + latch.await(); + } +} diff --git a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java index 15a5efade..622a88ac5 100644 --- a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java +++ b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java @@ -116,14 +116,14 @@ public static void main(String... args) throws Exception { TcpReactiveSocketConnector tcp = TcpReactiveSocketConnector.create(setupPayload, Throwable::printStackTrace); - ReactiveSocket client = ClientBuilder.instance() + Publisher socketPublisher = ClientBuilder.instance() .withSource(getServersList()) .withConnector(tcp) .withConnectTimeout(1, TimeUnit.SECONDS) .withRequestTimeout(1, TimeUnit.SECONDS) .build(); - Unsafe.awaitAvailability(client); + ReactiveSocket client = Unsafe.blockingSingleWait(socketPublisher, 5, TimeUnit.SECONDS); System.out.println("Client ready, starting the load..."); long testDurationNs = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS); From b67f12915e71fa87e3c55dd068c078a9ab2ba2bb Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Tue, 12 Jul 2016 15:38:19 -0700 Subject: [PATCH 2/3] Check for null executor --- .../src/main/java/io/reactivesocket/client/ClientBuilder.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java index 4f20d84c6..c0d9cf921 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java @@ -118,6 +118,10 @@ public void request(long n) { subscriber.onError(new IllegalStateException("Please configure the source!")); return; } + if (executor == null) { + subscriber.onError(new IllegalStateException("Please configure the executor!")); + return; + } if (connector == null) { subscriber.onError(new IllegalStateException("Please configure the connector!")); return; From 5e8b37cf8a205ad6a0299b37909df67f26df747a Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Tue, 12 Jul 2016 15:55:23 -0700 Subject: [PATCH 3/3] Use TestingSubscriber --- .../client/ClientBuilderTest.java | 69 ++++++------------- 1 file changed, 20 insertions(+), 49 deletions(-) diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java index 958396fdb..481ad8bf7 100644 --- a/reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java @@ -8,6 +8,8 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import rx.Observable; +import rx.observers.TestSubscriber; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -18,6 +20,7 @@ import java.util.function.Function; import static org.hamcrest.Matchers.instanceOf; +import static rx.RxReactiveStreams.toObservable; public class ClientBuilderTest { @@ -25,32 +28,14 @@ public class ClientBuilderTest { public void testIllegalState() throws ExecutionException, InterruptedException { // you need to specify the source and the connector Publisher socketPublisher = ClientBuilder.instance().build(); + Observable socketObservable = toObservable(socketPublisher); + TestSubscriber testSubscriber = TestSubscriber.create(); - CountDownLatch latch = new CountDownLatch(1); - socketPublisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(1L); - } + socketObservable.subscribe(testSubscriber); + testSubscriber.awaitTerminalEvent(); - @Override - public void onNext(ReactiveSocket reactiveSocket) { - throw new AssertionError("onNext invoked when not expected."); - } - - @Override - public void onError(Throwable t) { - MatcherAssert.assertThat("Unexpected exception in onError", t, instanceOf(IllegalStateException.class)); - latch.countDown(); - } - - @Override - public void onComplete() { - throw new AssertionError("onComplete invoked when not expected."); - } - }); - - latch.await(); + testSubscriber.assertNoValues(); + testSubscriber.assertError(IllegalStateException.class); } @Test(timeout = 10_000L) @@ -69,32 +54,18 @@ public void testReturnedRSisAvailable() throws ExecutionException, InterruptedEx .withConnector(connector) .build(); - CountDownLatch latch = new CountDownLatch(1); - socketPublisher.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(1L); - } - - @Override - public void onNext(ReactiveSocket reactiveSocket) { - // the returned ReactiveSocket must have an availability > 0.0 - if (reactiveSocket.availability() == 0.0) { - throw new AssertionError("Loadbalancer availability is zero!"); - } - } - - @Override - public void onError(Throwable t) { - throw new AssertionError("onError invoked when not expected."); - } + Observable socketObservable = toObservable(socketPublisher); + TestSubscriber testSubscriber = TestSubscriber.create(); + socketObservable.subscribe(testSubscriber); + testSubscriber.awaitTerminalEvent(); - @Override - public void onComplete() { - latch.countDown(); - } - }); + testSubscriber.assertNoErrors(); + testSubscriber.assertValueCount(1); + testSubscriber.assertCompleted(); - latch.await(); + ReactiveSocket socket = (ReactiveSocket) testSubscriber.getOnNextEvents().get(0); + if (socket.availability() == 0.0) { + throw new AssertionError("Loadbalancer availability is zero!"); + } } }