diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/ReactiveSocket.java b/reactivesocket-core/src/main/java/io/reactivesocket/ReactiveSocket.java index cf713cf69..7d8639158 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/ReactiveSocket.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/ReactiveSocket.java @@ -67,36 +67,6 @@ public interface ReactiveSocket { */ void start(Completable c); - /** - * Start and block the current thread until startup is finished. - * - * @throws RuntimeException - * of InterruptedException - */ - default void startAndWait() { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference err = new AtomicReference<>(); - start(new Completable() { - @Override - public void success() { - latch.countDown(); - } - - @Override - public void error(Throwable e) { - latch.countDown(); - } - }); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (err.get() != null) { - throw new RuntimeException(err.get()); - } - } - /** * Invoked when Requester is ready. Non-null exception if error. Null if success. * diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/util/Unsafe.java b/reactivesocket-core/src/main/java/io/reactivesocket/util/Unsafe.java index bdf0c0bbb..3c4ceffe7 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/util/Unsafe.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/util/Unsafe.java @@ -26,7 +26,6 @@ public void error(Throwable e) { }; rsc.start(completable); latch.await(); -// awaitAvailability(rsc); return rsc; } diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java b/reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java index fe25022f6..fe40d1ffd 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java @@ -17,6 +17,7 @@ import io.reactivesocket.internal.Publishers; import io.reactivesocket.lease.FairLeaseGovernor; +import io.reactivesocket.util.Unsafe; import io.reactivex.subscribers.TestSubscriber; import org.junit.After; import org.junit.Ignore; @@ -225,8 +226,8 @@ public Publisher handleMetadataPush(Payload payload) { err -> err.printStackTrace()); // start both the server and client and monitor for errors - socketServer.startAndWait(); - socketClient.startAndWait(); + Unsafe.startAndWait(socketServer); + Unsafe.startAndWait(socketClient); } @After diff --git a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java index f43335e69..8834e7850 100644 --- a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java +++ b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java @@ -78,11 +78,6 @@ public void start(Completable c) { child.start(c); } - @Override - public void startAndWait() { - child.startAndWait(); - } - @Override public void onRequestReady(Consumer c) { child.onRequestReady(c); diff --git a/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/fireandforget/Fire.java b/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/fireandforget/Fire.java index 2b3e7db61..c9c332d80 100644 --- a/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/fireandforget/Fire.java +++ b/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/fireandforget/Fire.java @@ -23,6 +23,7 @@ import io.reactivesocket.aeron.client.AeronClientDuplexConnection; import io.reactivesocket.aeron.client.AeronClientDuplexConnectionFactory; import io.reactivesocket.aeron.client.FrameHolder; +import io.reactivesocket.util.Unsafe; import org.HdrHistogram.Recorder; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -62,8 +63,9 @@ public static void main(String... args) throws Exception { AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single(); System.out.println("Created duplex connection"); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS)); - reactiveSocket.startAndWait(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload); + Unsafe.startAndWait(reactiveSocket); CountDownLatch latch = new CountDownLatch(Integer.MAX_VALUE); diff --git a/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/requestreply/Ping.java b/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/requestreply/Ping.java index 7e7e7fa68..f37436007 100644 --- a/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/requestreply/Ping.java +++ b/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/requestreply/Ping.java @@ -22,6 +22,7 @@ import io.reactivesocket.aeron.client.AeronClientDuplexConnection; import io.reactivesocket.aeron.client.AeronClientDuplexConnectionFactory; import io.reactivesocket.aeron.client.FrameHolder; +import io.reactivesocket.util.Unsafe; import org.HdrHistogram.Recorder; import org.reactivestreams.Publisher; import rx.Observable; @@ -64,8 +65,9 @@ public static void main(String... args) throws Exception { AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single(); System.out.println("Created duplex connection"); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS)); - reactiveSocket.startAndWait(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload); + Unsafe.startAndWait(reactiveSocket); CountDownLatch latch = new CountDownLatch(Integer.MAX_VALUE); diff --git a/reactivesocket-transport-aeron/src/main/java/io/reactivesocket/aeron/server/ReactiveSocketAeronServer.java b/reactivesocket-transport-aeron/src/main/java/io/reactivesocket/aeron/server/ReactiveSocketAeronServer.java index 452158029..a38bd1c7b 100644 --- a/reactivesocket-transport-aeron/src/main/java/io/reactivesocket/aeron/server/ReactiveSocketAeronServer.java +++ b/reactivesocket-transport-aeron/src/main/java/io/reactivesocket/aeron/server/ReactiveSocketAeronServer.java @@ -29,6 +29,7 @@ import io.reactivesocket.aeron.internal.Loggable; import io.reactivesocket.aeron.internal.MessageType; import io.reactivesocket.rx.Observer; +import io.reactivesocket.util.Unsafe; import org.agrona.BitUtil; import org.agrona.DirectBuffer; import org.agrona.concurrent.UnsafeBuffer; @@ -174,7 +175,11 @@ public void accept(Throwable throwable) { sockets.put(sessionId, socket); - socket.startAndWait(); + try { + Unsafe.startAndWait(socket); + } catch (InterruptedException e) { + e.printStackTrace(); + } } else { debug("Unsupported stream id {}", streamId); } diff --git a/reactivesocket-transport-aeron/src/test/java/io/reactivesocket/aeron/client/ReactiveSocketAeronTest.java b/reactivesocket-transport-aeron/src/test/java/io/reactivesocket/aeron/client/ReactiveSocketAeronTest.java index 4ff58a8af..002369f10 100644 --- a/reactivesocket-transport-aeron/src/test/java/io/reactivesocket/aeron/client/ReactiveSocketAeronTest.java +++ b/reactivesocket-transport-aeron/src/test/java/io/reactivesocket/aeron/client/ReactiveSocketAeronTest.java @@ -26,6 +26,7 @@ import io.reactivesocket.aeron.server.ReactiveSocketAeronServer; import io.reactivesocket.exceptions.SetupException; import io.reactivesocket.test.TestUtil; +import io.reactivesocket.util.Unsafe; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; @@ -151,8 +152,9 @@ public Publisher apply(Payload payload) { AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single(); System.out.println("Created duplex connection"); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS)); - reactiveSocket.startAndWait(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload); + Unsafe.startAndWait(reactiveSocket); CountDownLatch latch = new CountDownLatch(count); @@ -225,8 +227,9 @@ public void requestStreamN(int count) throws Exception { AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single(); System.out.println("Created duplex connection"); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS)); - reactiveSocket.startAndWait(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload); + Unsafe.startAndWait(reactiveSocket); CountDownLatch latch = new CountDownLatch(count); Payload payload = TestUtil.utf8EncodedPayload("client_request", "client_metadata"); @@ -323,8 +326,9 @@ public Publisher handleMetadataPush(Payload payload) { AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single(); System.out.println("Created duplex connection => " + j); - ReactiveSocket client = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS)); - client.startAndWait(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + ReactiveSocket client = DefaultReactiveSocket.fromClientConnection(connection, setupPayload); + Unsafe.startAndWait(client); Observable .range(1, 10) diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java index 43740f4b1..d2bcce038 100644 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java @@ -17,6 +17,7 @@ import io.reactivesocket.*; import io.reactivesocket.internal.rx.EmptySubscription; +import io.reactivesocket.util.Unsafe; import org.reactivestreams.Publisher; public class LocalClientReactiveSocketConnector implements ReactiveSocketConnector { @@ -35,7 +36,7 @@ public Publisher connect(Config config) { ReactiveSocket reactiveSocket = DefaultReactiveSocket .fromClientConnection(clientConnection, ConnectionSetupPayload.create(config.getMetadataMimeType(), config.getDataMimeType())); - reactiveSocket.startAndWait(); + Unsafe.startAndWait(reactiveSocket); s.onNext(reactiveSocket); s.onComplete(); diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java index 58ad1d62b..984618394 100644 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java @@ -17,6 +17,7 @@ import io.reactivesocket.*; import io.reactivesocket.internal.rx.EmptySubscription; +import io.reactivesocket.util.Unsafe; import org.reactivestreams.Publisher; public class LocalServerReactiveSocketConnector implements ReactiveSocketConnector { @@ -35,7 +36,7 @@ public Publisher connect(Config config) { ReactiveSocket reactiveSocket = DefaultReactiveSocket .fromServerConnection(clientConnection, config.getConnectionSetupHandler()); - reactiveSocket.startAndWait(); + Unsafe.startAndWait(reactiveSocket); s.onNext(reactiveSocket); s.onComplete(); } catch (Throwable t) { diff --git a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java index 221c2ae2e..f3d404824 100644 --- a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java +++ b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java @@ -25,6 +25,7 @@ import io.reactivesocket.LeaseGovernor; import io.reactivesocket.ReactiveSocket; import io.reactivesocket.transport.tcp.MutableDirectByteBuf; +import io.reactivesocket.util.Unsafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromServerConnection(connection, setupHandler, leaseGovernor, Throwable::printStackTrace); // Note: No blocking code here (still it should be refactored) - reactiveSocket.startAndWait(); + Unsafe.startAndWait(reactiveSocket); } @Override diff --git a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/ClientServerTest.java b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/ClientServerTest.java index ff239709c..d184de6a1 100644 --- a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/ClientServerTest.java +++ b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/ClientServerTest.java @@ -35,6 +35,7 @@ import io.reactivesocket.transport.websocket.client.ClientWebSocketDuplexConnection; import io.reactivesocket.transport.websocket.server.ReactiveSocketServerHandler; import io.reactivesocket.test.TestUtil; +import io.reactivesocket.util.Unsafe; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -131,8 +132,7 @@ protected void initChannel(Channel ch) throws Exception { client = DefaultReactiveSocket .fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace()); - client.startAndWait(); - + Unsafe.startAndWait(client); } @AfterClass diff --git a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java index 52e860986..8241f753d 100644 --- a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java +++ b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java @@ -21,6 +21,7 @@ import io.reactivesocket.Payload; import io.reactivesocket.ReactiveSocket; import io.reactivesocket.transport.websocket.client.ClientWebSocketDuplexConnection; +import io.reactivesocket.util.Unsafe; import org.HdrHistogram.Recorder; import org.reactivestreams.Publisher; import rx.Observable; @@ -47,7 +48,7 @@ public static void main(String... args) throws Exception { ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(duplexConnection, setupPayload, Throwable::printStackTrace); - reactiveSocket.startAndWait(); + Unsafe.startAndWait(reactiveSocket); byte[] data = "hello".getBytes(StandardCharsets.UTF_8);