Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,6 @@ public interface ReactiveSocket {
*/
void start(Completable c);

/**
* Start and block the current thread until startup is finished.
*
* @throws RuntimeException
* of InterruptedException
*/
default void startAndWait() {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> err = new AtomicReference<>();
start(new Completable() {
@Override
public void success() {
latch.countDown();
}

@Override
public void error(Throwable e) {
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (err.get() != null) {
throw new RuntimeException(err.get());
}
}

/**
* Invoked when Requester is ready. Non-null exception if error. Null if success.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public void error(Throwable e) {
};
rsc.start(completable);
latch.await();
// awaitAvailability(rsc);

return rsc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.reactivesocket.internal.Publishers;
import io.reactivesocket.lease.FairLeaseGovernor;
import io.reactivesocket.util.Unsafe;
import io.reactivex.subscribers.TestSubscriber;
import org.junit.After;
import org.junit.Ignore;
Expand Down Expand Up @@ -225,8 +226,8 @@ public Publisher<Void> handleMetadataPush(Payload payload) {
err -> err.printStackTrace());

// start both the server and client and monitor for errors
socketServer.startAndWait();
socketClient.startAndWait();
Unsafe.startAndWait(socketServer);
Unsafe.startAndWait(socketClient);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ public void start(Completable c) {
child.start(c);
}

@Override
public void startAndWait() {
child.startAndWait();
}

@Override
public void onRequestReady(Consumer<Throwable> c) {
child.onRequestReady(c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.reactivesocket.aeron.client.AeronClientDuplexConnection;
import io.reactivesocket.aeron.client.AeronClientDuplexConnectionFactory;
import io.reactivesocket.aeron.client.FrameHolder;
import io.reactivesocket.util.Unsafe;
import org.HdrHistogram.Recorder;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -62,8 +63,9 @@ public static void main(String... args) throws Exception {
AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single();
System.out.println("Created duplex connection");

ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS));
reactiveSocket.startAndWait();
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS);
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload);
Unsafe.startAndWait(reactiveSocket);

CountDownLatch latch = new CountDownLatch(Integer.MAX_VALUE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.reactivesocket.aeron.client.AeronClientDuplexConnection;
import io.reactivesocket.aeron.client.AeronClientDuplexConnectionFactory;
import io.reactivesocket.aeron.client.FrameHolder;
import io.reactivesocket.util.Unsafe;
import org.HdrHistogram.Recorder;
import org.reactivestreams.Publisher;
import rx.Observable;
Expand Down Expand Up @@ -64,8 +65,9 @@ public static void main(String... args) throws Exception {
AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single();
System.out.println("Created duplex connection");

ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS));
reactiveSocket.startAndWait();
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS);
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload);
Unsafe.startAndWait(reactiveSocket);

CountDownLatch latch = new CountDownLatch(Integer.MAX_VALUE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.reactivesocket.aeron.internal.Loggable;
import io.reactivesocket.aeron.internal.MessageType;
import io.reactivesocket.rx.Observer;
import io.reactivesocket.util.Unsafe;
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
Expand Down Expand Up @@ -174,7 +175,11 @@ public void accept(Throwable throwable) {

sockets.put(sessionId, socket);

socket.startAndWait();
try {
Unsafe.startAndWait(socket);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
debug("Unsupported stream id {}", streamId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.reactivesocket.aeron.server.ReactiveSocketAeronServer;
import io.reactivesocket.exceptions.SetupException;
import io.reactivesocket.test.TestUtil;
import io.reactivesocket.util.Unsafe;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
Expand Down Expand Up @@ -151,8 +152,9 @@ public Publisher<Payload> apply(Payload payload) {
AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single();
System.out.println("Created duplex connection");

ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS));
reactiveSocket.startAndWait();
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS);
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload);
Unsafe.startAndWait(reactiveSocket);

CountDownLatch latch = new CountDownLatch(count);

Expand Down Expand Up @@ -225,8 +227,9 @@ public void requestStreamN(int count) throws Exception {
AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single();
System.out.println("Created duplex connection");

ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS));
reactiveSocket.startAndWait();
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS);
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload);
Unsafe.startAndWait(reactiveSocket);

CountDownLatch latch = new CountDownLatch(count);
Payload payload = TestUtil.utf8EncodedPayload("client_request", "client_metadata");
Expand Down Expand Up @@ -323,8 +326,9 @@ public Publisher<Void> handleMetadataPush(Payload payload) {
AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single();
System.out.println("Created duplex connection => " + j);

ReactiveSocket client = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS));
client.startAndWait();
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS);
ReactiveSocket client = DefaultReactiveSocket.fromClientConnection(connection, setupPayload);
Unsafe.startAndWait(client);

Observable
.range(1, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.reactivesocket.*;
import io.reactivesocket.internal.rx.EmptySubscription;
import io.reactivesocket.util.Unsafe;
import org.reactivestreams.Publisher;

public class LocalClientReactiveSocketConnector implements ReactiveSocketConnector<LocalClientReactiveSocketConnector.Config> {
Expand All @@ -35,7 +36,7 @@ public Publisher<ReactiveSocket> connect(Config config) {
ReactiveSocket reactiveSocket = DefaultReactiveSocket
.fromClientConnection(clientConnection, ConnectionSetupPayload.create(config.getMetadataMimeType(), config.getDataMimeType()));

reactiveSocket.startAndWait();
Unsafe.startAndWait(reactiveSocket);

s.onNext(reactiveSocket);
s.onComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.reactivesocket.*;
import io.reactivesocket.internal.rx.EmptySubscription;
import io.reactivesocket.util.Unsafe;
import org.reactivestreams.Publisher;

public class LocalServerReactiveSocketConnector implements ReactiveSocketConnector<LocalServerReactiveSocketConnector.Config> {
Expand All @@ -35,7 +36,7 @@ public Publisher<ReactiveSocket> connect(Config config) {
ReactiveSocket reactiveSocket = DefaultReactiveSocket
.fromServerConnection(clientConnection, config.getConnectionSetupHandler());

reactiveSocket.startAndWait();
Unsafe.startAndWait(reactiveSocket);
s.onNext(reactiveSocket);
s.onComplete();
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.reactivesocket.LeaseGovernor;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.transport.tcp.MutableDirectByteBuf;
import io.reactivesocket.util.Unsafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -54,7 +55,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
ReactiveSocket reactiveSocket =
DefaultReactiveSocket.fromServerConnection(connection, setupHandler, leaseGovernor, Throwable::printStackTrace);
// Note: No blocking code here (still it should be refactored)
reactiveSocket.startAndWait();
Unsafe.startAndWait(reactiveSocket);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.reactivesocket.transport.websocket.client.ClientWebSocketDuplexConnection;
import io.reactivesocket.transport.websocket.server.ReactiveSocketServerHandler;
import io.reactivesocket.test.TestUtil;
import io.reactivesocket.util.Unsafe;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -131,8 +132,7 @@ protected void initChannel(Channel ch) throws Exception {
client = DefaultReactiveSocket
.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace());

client.startAndWait();

Unsafe.startAndWait(client);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.transport.websocket.client.ClientWebSocketDuplexConnection;
import io.reactivesocket.util.Unsafe;
import org.HdrHistogram.Recorder;
import org.reactivestreams.Publisher;
import rx.Observable;
Expand All @@ -47,7 +48,7 @@ public static void main(String... args) throws Exception {
ReactiveSocket reactiveSocket =
DefaultReactiveSocket.fromClientConnection(duplexConnection, setupPayload, Throwable::printStackTrace);

reactiveSocket.startAndWait();
Unsafe.startAndWait(reactiveSocket);

byte[] data = "hello".getBytes(StandardCharsets.UTF_8);

Expand Down