diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java index 563aba477..c0d9cf921 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/ClientBuilder.java @@ -23,7 +23,9 @@ import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class ClientBuilder { private final ScheduledExecutorService executor; @@ -104,26 +106,59 @@ public ClientBuilder withSource(Publisher> source) { ); } - public ReactiveSocket build() { - if (source == null) { - throw new IllegalStateException("Please configure the source!"); - } - if (connector == null) { - throw new IllegalStateException("Please configure the connector!"); - } + public Publisher build() { + return subscriber -> { + subscriber.onSubscribe(new Subscription() { + private ScheduledFuture scheduledFuture = null; + private AtomicBoolean cancelled = new AtomicBoolean(false); + @Override + public void request(long n) { + if (source == null) { + subscriber.onError(new IllegalStateException("Please configure the source!")); + return; + } + if (executor == null) { + subscriber.onError(new IllegalStateException("Please configure the executor!")); + return; + } + if (connector == null) { + subscriber.onError(new IllegalStateException("Please configure the connector!")); + return; + } - ReactiveSocketConnector filterConnector = connector; - if (requestTimeout > 0) { - filterConnector = filterConnector - .chain(socket -> new TimeoutSocket(socket, requestTimeout, requestTimeoutUnit, executor)); - } - filterConnector = filterConnector.chain(DrainingSocket::new); - - Publisher>> factories = - sourceToFactory(source, filterConnector); + ReactiveSocketConnector filterConnector = connector; + if (requestTimeout > 0) { + filterConnector = filterConnector + .chain(socket -> new TimeoutSocket(socket, requestTimeout, requestTimeoutUnit, executor)); + } + filterConnector = filterConnector.chain(DrainingSocket::new); + + Publisher>> factories = + sourceToFactory(source, filterConnector); + LoadBalancer loadBalancer = new LoadBalancer<>(factories); + + scheduledFuture = executor.scheduleAtFixedRate(() -> { + if (loadBalancer.availability() > 0 && !cancelled.get()) { + subscriber.onNext(loadBalancer); + subscriber.onComplete(); + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + } + }, 1L, 50L, TimeUnit.MILLISECONDS); + } - return new LoadBalancer<>(factories); + @Override + public void cancel() { + if (cancelled.compareAndSet(false, true)) { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + } + } + }); + }; } private Publisher>> sourceToFactory( @@ -136,8 +171,8 @@ private Publisher>> sourceToFactor @Override public void onSubscribe(Subscription s) { - subscriber.onSubscribe(s); current = Collections.emptyMap(); + subscriber.onSubscribe(s); } @Override diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java new file mode 100644 index 000000000..481ad8bf7 --- /dev/null +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/ClientBuilderTest.java @@ -0,0 +1,71 @@ +package io.reactivesocket.client; + +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketConnector; +import io.reactivesocket.internal.Publishers; +import org.hamcrest.MatcherAssert; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import rx.Observable; +import rx.observers.TestSubscriber; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import static org.hamcrest.Matchers.instanceOf; +import static rx.RxReactiveStreams.toObservable; + +public class ClientBuilderTest { + + @Test(timeout = 10_000L) + public void testIllegalState() throws ExecutionException, InterruptedException { + // you need to specify the source and the connector + Publisher socketPublisher = ClientBuilder.instance().build(); + Observable socketObservable = toObservable(socketPublisher); + TestSubscriber testSubscriber = TestSubscriber.create(); + + socketObservable.subscribe(testSubscriber); + testSubscriber.awaitTerminalEvent(); + + testSubscriber.assertNoValues(); + testSubscriber.assertError(IllegalStateException.class); + } + + @Test(timeout = 10_000L) + public void testReturnedRSisAvailable() throws ExecutionException, InterruptedException { + + List addrs = Collections.singletonList( + InetSocketAddress.createUnresolved("localhost", 8080)); + Publisher> src = Publishers.just(addrs); + + ReactiveSocketConnector connector = + address -> Publishers.just(new TestingReactiveSocket(Function.identity())); + + Publisher socketPublisher = + ClientBuilder.instance() + .withSource(src) + .withConnector(connector) + .build(); + + Observable socketObservable = toObservable(socketPublisher); + TestSubscriber testSubscriber = TestSubscriber.create(); + socketObservable.subscribe(testSubscriber); + testSubscriber.awaitTerminalEvent(); + + testSubscriber.assertNoErrors(); + testSubscriber.assertValueCount(1); + testSubscriber.assertCompleted(); + + ReactiveSocket socket = (ReactiveSocket) testSubscriber.getOnNextEvents().get(0); + if (socket.availability() == 0.0) { + throw new AssertionError("Loadbalancer availability is zero!"); + } + } +} diff --git a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java index 15a5efade..622a88ac5 100644 --- a/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java +++ b/reactivesocket-examples/src/main/java/io/reactivesocket/examples/StressTest.java @@ -116,14 +116,14 @@ public static void main(String... args) throws Exception { TcpReactiveSocketConnector tcp = TcpReactiveSocketConnector.create(setupPayload, Throwable::printStackTrace); - ReactiveSocket client = ClientBuilder.instance() + Publisher socketPublisher = ClientBuilder.instance() .withSource(getServersList()) .withConnector(tcp) .withConnectTimeout(1, TimeUnit.SECONDS) .withRequestTimeout(1, TimeUnit.SECONDS) .build(); - Unsafe.awaitAvailability(client); + ReactiveSocket client = Unsafe.blockingSingleWait(socketPublisher, 5, TimeUnit.SECONDS); System.out.println("Client ready, starting the load..."); long testDurationNs = TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);