From ca186101cafcd97eba3463d310ae05296b07fcfa Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Tue, 12 Jul 2016 16:05:51 -0700 Subject: [PATCH 1/2] ReactiveSocket: Remove `startAndWait` method ***Problem*** ReactiveSocket shouldn't have a public API encouraging usage of blocking code. ***Solution*** Remove the method, replace it by `start` where it was easy and by `Unsafe.startAndWait` elsewhere. --- .../io/reactivesocket/ReactiveSocket.java | 30 --- .../java/io/reactivesocket/util/Unsafe.java | 1 - .../reactivesocket/TestTransportRequestN.java | 5 +- .../AvailabilityMetricReactiveSocket.java | 5 - .../aeron/example/fireandforget/Fire.java | 6 +- .../aeron/example/requestreply/Ping.java | 6 +- .../server/ReactiveSocketAeronServer.java | 7 +- .../aeron/client/ReactiveSocketAeronTest.java | 16 +- reactivesocket-transport-local/build.gradle | 19 -- .../local/LocalClientDuplexConnection.java | 107 ---------- .../LocalClientReactiveSocketConnector.java | 71 ------- .../local/LocalReactiveSocketManager.java | 54 ----- .../local/LocalServerDuplexConection.java | 108 ---------- .../LocalServerReactiveSocketConnector.java | 64 ------ .../local/ClientServerTest.java | 185 ------------------ .../server/ReactiveSocketServerHandler.java | 3 +- .../transport/websocket/ClientServerTest.java | 4 +- .../transport/websocket/Ping.java | 3 +- settings.gradle | 1 - 19 files changed, 33 insertions(+), 662 deletions(-) delete mode 100644 reactivesocket-transport-local/build.gradle delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java delete mode 100644 reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/ReactiveSocket.java b/reactivesocket-core/src/main/java/io/reactivesocket/ReactiveSocket.java index cf713cf69..7d8639158 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/ReactiveSocket.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/ReactiveSocket.java @@ -67,36 +67,6 @@ public interface ReactiveSocket { */ void start(Completable c); - /** - * Start and block the current thread until startup is finished. - * - * @throws RuntimeException - * of InterruptedException - */ - default void startAndWait() { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference err = new AtomicReference<>(); - start(new Completable() { - @Override - public void success() { - latch.countDown(); - } - - @Override - public void error(Throwable e) { - latch.countDown(); - } - }); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (err.get() != null) { - throw new RuntimeException(err.get()); - } - } - /** * Invoked when Requester is ready. Non-null exception if error. Null if success. * diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/util/Unsafe.java b/reactivesocket-core/src/main/java/io/reactivesocket/util/Unsafe.java index bdf0c0bbb..3c4ceffe7 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/util/Unsafe.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/util/Unsafe.java @@ -26,7 +26,6 @@ public void error(Throwable e) { }; rsc.start(completable); latch.await(); -// awaitAvailability(rsc); return rsc; } diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java b/reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java index fe25022f6..fe40d1ffd 100644 --- a/reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java +++ b/reactivesocket-core/src/test/java/io/reactivesocket/TestTransportRequestN.java @@ -17,6 +17,7 @@ import io.reactivesocket.internal.Publishers; import io.reactivesocket.lease.FairLeaseGovernor; +import io.reactivesocket.util.Unsafe; import io.reactivex.subscribers.TestSubscriber; import org.junit.After; import org.junit.Ignore; @@ -225,8 +226,8 @@ public Publisher handleMetadataPush(Payload payload) { err -> err.printStackTrace()); // start both the server and client and monitor for errors - socketServer.startAndWait(); - socketClient.startAndWait(); + Unsafe.startAndWait(socketServer); + Unsafe.startAndWait(socketClient); } @After diff --git a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java index f43335e69..8834e7850 100644 --- a/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java +++ b/reactivesocket-stats-servo/src/main/java/io/reactivesocket/loadbalancer/servo/AvailabilityMetricReactiveSocket.java @@ -78,11 +78,6 @@ public void start(Completable c) { child.start(c); } - @Override - public void startAndWait() { - child.startAndWait(); - } - @Override public void onRequestReady(Consumer c) { child.onRequestReady(c); diff --git a/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/fireandforget/Fire.java b/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/fireandforget/Fire.java index 2b3e7db61..c9c332d80 100644 --- a/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/fireandforget/Fire.java +++ b/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/fireandforget/Fire.java @@ -23,6 +23,7 @@ import io.reactivesocket.aeron.client.AeronClientDuplexConnection; import io.reactivesocket.aeron.client.AeronClientDuplexConnectionFactory; import io.reactivesocket.aeron.client.FrameHolder; +import io.reactivesocket.util.Unsafe; import org.HdrHistogram.Recorder; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -62,8 +63,9 @@ public static void main(String... args) throws Exception { AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single(); System.out.println("Created duplex connection"); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS)); - reactiveSocket.startAndWait(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload); + Unsafe.startAndWait(reactiveSocket); CountDownLatch latch = new CountDownLatch(Integer.MAX_VALUE); diff --git a/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/requestreply/Ping.java b/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/requestreply/Ping.java index 7e7e7fa68..f37436007 100644 --- a/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/requestreply/Ping.java +++ b/reactivesocket-transport-aeron/src/examples/java/io/reactivesocket/aeron/example/requestreply/Ping.java @@ -22,6 +22,7 @@ import io.reactivesocket.aeron.client.AeronClientDuplexConnection; import io.reactivesocket.aeron.client.AeronClientDuplexConnectionFactory; import io.reactivesocket.aeron.client.FrameHolder; +import io.reactivesocket.util.Unsafe; import org.HdrHistogram.Recorder; import org.reactivestreams.Publisher; import rx.Observable; @@ -64,8 +65,9 @@ public static void main(String... args) throws Exception { AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single(); System.out.println("Created duplex connection"); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS)); - reactiveSocket.startAndWait(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload); + Unsafe.startAndWait(reactiveSocket); CountDownLatch latch = new CountDownLatch(Integer.MAX_VALUE); diff --git a/reactivesocket-transport-aeron/src/main/java/io/reactivesocket/aeron/server/ReactiveSocketAeronServer.java b/reactivesocket-transport-aeron/src/main/java/io/reactivesocket/aeron/server/ReactiveSocketAeronServer.java index 452158029..a38bd1c7b 100644 --- a/reactivesocket-transport-aeron/src/main/java/io/reactivesocket/aeron/server/ReactiveSocketAeronServer.java +++ b/reactivesocket-transport-aeron/src/main/java/io/reactivesocket/aeron/server/ReactiveSocketAeronServer.java @@ -29,6 +29,7 @@ import io.reactivesocket.aeron.internal.Loggable; import io.reactivesocket.aeron.internal.MessageType; import io.reactivesocket.rx.Observer; +import io.reactivesocket.util.Unsafe; import org.agrona.BitUtil; import org.agrona.DirectBuffer; import org.agrona.concurrent.UnsafeBuffer; @@ -174,7 +175,11 @@ public void accept(Throwable throwable) { sockets.put(sessionId, socket); - socket.startAndWait(); + try { + Unsafe.startAndWait(socket); + } catch (InterruptedException e) { + e.printStackTrace(); + } } else { debug("Unsupported stream id {}", streamId); } diff --git a/reactivesocket-transport-aeron/src/test/java/io/reactivesocket/aeron/client/ReactiveSocketAeronTest.java b/reactivesocket-transport-aeron/src/test/java/io/reactivesocket/aeron/client/ReactiveSocketAeronTest.java index 4ff58a8af..002369f10 100644 --- a/reactivesocket-transport-aeron/src/test/java/io/reactivesocket/aeron/client/ReactiveSocketAeronTest.java +++ b/reactivesocket-transport-aeron/src/test/java/io/reactivesocket/aeron/client/ReactiveSocketAeronTest.java @@ -26,6 +26,7 @@ import io.reactivesocket.aeron.server.ReactiveSocketAeronServer; import io.reactivesocket.exceptions.SetupException; import io.reactivesocket.test.TestUtil; +import io.reactivesocket.util.Unsafe; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; @@ -151,8 +152,9 @@ public Publisher apply(Payload payload) { AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single(); System.out.println("Created duplex connection"); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS)); - reactiveSocket.startAndWait(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload); + Unsafe.startAndWait(reactiveSocket); CountDownLatch latch = new CountDownLatch(count); @@ -225,8 +227,9 @@ public void requestStreamN(int count) throws Exception { AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single(); System.out.println("Created duplex connection"); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS)); - reactiveSocket.startAndWait(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, setupPayload); + Unsafe.startAndWait(reactiveSocket); CountDownLatch latch = new CountDownLatch(count); Payload payload = TestUtil.utf8EncodedPayload("client_request", "client_metadata"); @@ -323,8 +326,9 @@ public Publisher handleMetadataPush(Payload payload) { AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single(); System.out.println("Created duplex connection => " + j); - ReactiveSocket client = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS)); - client.startAndWait(); + ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS); + ReactiveSocket client = DefaultReactiveSocket.fromClientConnection(connection, setupPayload); + Unsafe.startAndWait(client); Observable .range(1, 10) diff --git a/reactivesocket-transport-local/build.gradle b/reactivesocket-transport-local/build.gradle deleted file mode 100644 index 1c943599d..000000000 --- a/reactivesocket-transport-local/build.gradle +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2016 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -dependencies { - compile project(':reactivesocket-transport-tcp') - compile project(':reactivesocket-core') - - testCompile project(':reactivesocket-test') -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java deleted file mode 100644 index 86855f9cc..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.internal.EmptySubject; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.concurrent.CopyOnWriteArrayList; - -class LocalClientDuplexConnection implements DuplexConnection { - private final String name; - - private final CopyOnWriteArrayList> subjects; - private final EmptySubject closeSubject = new EmptySubject(); - - public LocalClientDuplexConnection(String name) { - this.name = name; - this.subjects = new CopyOnWriteArrayList<>(); - } - - @Override - public Observable getInput() { - return o -> { - o.onSubscribe(() -> subjects.removeIf(s -> s == o)); - subjects.add(o); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - o - .subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Frame frame) { - try { - LocalReactiveSocketManager - .getInstance() - .getServerConnection(name) - .write(frame); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable t) { - callback.error(t); - } - - @Override - public void onComplete() { - callback.success(); - } - }); - } - - @Override - public double availability() { - return 1.0; - } - - void write(Frame frame) { - subjects - .forEach(o -> o.onNext(frame)); - } - - @Override - public Publisher close() { - return s -> { - LocalReactiveSocketManager - .getInstance() - .removeClientConnection(name); - closeSubject.subscribe(s); - }; - } - - @Override - public Publisher onClose() { - return closeSubject; - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java deleted file mode 100644 index 43740f4b1..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.*; -import io.reactivesocket.internal.rx.EmptySubscription; -import org.reactivestreams.Publisher; - -public class LocalClientReactiveSocketConnector implements ReactiveSocketConnector { - public static final LocalClientReactiveSocketConnector INSTANCE = new LocalClientReactiveSocketConnector(); - - private LocalClientReactiveSocketConnector() {} - - @Override - public Publisher connect(Config config) { - return s -> { - try { - s.onSubscribe(EmptySubscription.INSTANCE); - LocalClientDuplexConnection clientConnection = LocalReactiveSocketManager - .getInstance() - .getClientConnection(config.getName()); - ReactiveSocket reactiveSocket = DefaultReactiveSocket - .fromClientConnection(clientConnection, ConnectionSetupPayload.create(config.getMetadataMimeType(), config.getDataMimeType())); - - reactiveSocket.startAndWait(); - - s.onNext(reactiveSocket); - s.onComplete(); - } catch (Throwable t) { - s.onError(t); - } - }; - } - - public static class Config { - final String name; - final String metadataMimeType; - final String dataMimeType; - - public Config(String name, String metadataMimeType, String dataMimeType) { - this.name = name; - this.metadataMimeType = metadataMimeType; - this.dataMimeType = dataMimeType; - } - - public String getName() { - return name; - } - - public String getMetadataMimeType() { - return metadataMimeType; - } - - public String getDataMimeType() { - return dataMimeType; - } - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java deleted file mode 100644 index 60d246266..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * Created by rroeser on 4/2/16. - */ -class LocalReactiveSocketManager { - private static final LocalReactiveSocketManager INSTANCE = new LocalReactiveSocketManager(); - - private final ConcurrentHashMap serverConnections; - private final ConcurrentHashMap clientConnections; - - private LocalReactiveSocketManager() { - serverConnections = new ConcurrentHashMap<>(); - clientConnections = new ConcurrentHashMap<>(); - } - - public static LocalReactiveSocketManager getInstance() { - return INSTANCE; - } - - public LocalClientDuplexConnection getClientConnection(String name) { - return clientConnections.computeIfAbsent(name, LocalClientDuplexConnection::new); - } - - public void removeClientConnection(String name) { - clientConnections.remove(name); - } - - public LocalServerDuplexConection getServerConnection(String name) { - return serverConnections.computeIfAbsent(name, LocalServerDuplexConection::new); - } - - public void removeServerDuplexConnection(String name) { - serverConnections.remove(name); - } - -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java deleted file mode 100644 index 9a3dde4d0..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.internal.EmptySubject; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.concurrent.CopyOnWriteArrayList; - -class LocalServerDuplexConection implements DuplexConnection { - private final String name; - - private final CopyOnWriteArrayList> subjects; - private final EmptySubject closeSubject = new EmptySubject(); - - public LocalServerDuplexConection(String name) { - this.name = name; - this.subjects = new CopyOnWriteArrayList<>(); - } - - @Override - public Observable getInput() { - return o -> { - o.onSubscribe(() -> subjects.removeIf(s -> s == o)); - subjects.add(o); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - o - .subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Frame frame) { - try { - LocalReactiveSocketManager - .getInstance() - .getClientConnection(name) - .write(frame); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable t) { - callback.error(t); - } - - @Override - public void onComplete() { - callback.success(); - } - }); - } - - @Override - public double availability() { - return 1.0; - } - - void write(Frame frame) { - subjects - .forEach(o -> o.onNext(frame)); - } - - @Override - public Publisher close() { - return s -> { - LocalReactiveSocketManager - .getInstance() - .removeServerDuplexConnection(name); - s.onComplete(); - closeSubject.onComplete(); - }; - } - - @Override - public Publisher onClose() { - return closeSubject; - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java deleted file mode 100644 index 58ad1d62b..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.*; -import io.reactivesocket.internal.rx.EmptySubscription; -import org.reactivestreams.Publisher; - -public class LocalServerReactiveSocketConnector implements ReactiveSocketConnector { - public static final LocalServerReactiveSocketConnector INSTANCE = new LocalServerReactiveSocketConnector(); - - private LocalServerReactiveSocketConnector() {} - - @Override - public Publisher connect(Config config) { - return s -> { - try { - s.onSubscribe(EmptySubscription.INSTANCE); - LocalServerDuplexConection clientConnection = LocalReactiveSocketManager - .getInstance() - .getServerConnection(config.getName()); - ReactiveSocket reactiveSocket = DefaultReactiveSocket - .fromServerConnection(clientConnection, config.getConnectionSetupHandler()); - - reactiveSocket.startAndWait(); - s.onNext(reactiveSocket); - s.onComplete(); - } catch (Throwable t) { - s.onError(t); - } - }; - } - - public static class Config { - final String name; - final ConnectionSetupHandler connectionSetupHandler; - - public Config(String name, ConnectionSetupHandler connectionSetupHandler) { - this.name = name; - this.connectionSetupHandler = connectionSetupHandler; - } - - public ConnectionSetupHandler getConnectionSetupHandler() { - return connectionSetupHandler; - } - - public String getName() { - return name; - } - } -} diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java deleted file mode 100644 index a62871e8f..000000000 --- a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.ConnectionSetupHandler; -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.exceptions.SetupException; -import io.reactivesocket.test.TestUtil; -import org.junit.BeforeClass; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import rx.Observable; -import rx.RxReactiveStreams; -import rx.observers.TestSubscriber; - -import java.util.concurrent.TimeUnit; - -import static io.reactivesocket.util.Unsafe.toSingleFuture; - -public class ClientServerTest { - - static ReactiveSocket client; - - static ReactiveSocket server; - - @BeforeClass - public static void setup() throws Exception { - LocalServerReactiveSocketConnector.Config serverConfig = new LocalServerReactiveSocketConnector.Config("test", new ConnectionSetupHandler() { - @Override - public RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket rs) throws SetupException { - return new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return s -> { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - s.onNext(response); - s.onComplete(); - }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response) - .repeat()); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - return null; - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } - }; - } - }); - - server = toSingleFuture(LocalServerReactiveSocketConnector.INSTANCE.connect(serverConfig)).get(5, TimeUnit.SECONDS); - - LocalClientReactiveSocketConnector.Config clientConfig = new LocalClientReactiveSocketConnector.Config("test", "text", "text"); - client = toSingleFuture(LocalClientReactiveSocketConnector.INSTANCE.connect(clientConfig)).get(5, TimeUnit.SECONDS);; - } - - @Test - public void testRequestResponse1() { - requestResponseN(1500, 1); - } - - @Test - public void testRequestResponse10() { - requestResponseN(1500, 10); - } - - - @Test - public void testRequestResponse100() { - requestResponseN(1500, 100); - } - - @Test - public void testRequestResponse10_000() { - requestResponseN(60_000, 10_000); - } - - - @Test - public void testRequestResponse100_000() { - requestResponseN(60_000, 10_000); - } - @Test - public void testRequestResponse1_000_000() { - requestResponseN(60_000, 10_000); - } - - @Test - public void testRequestStream() { - TestSubscriber ts = TestSubscriber.create(); - - RxReactiveStreams - .toObservable(client.requestStream(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .subscribe(ts); - - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - @Test - public void testRequestSubscription() throws InterruptedException { - TestSubscriber ts = TestSubscriber.create(); - - RxReactiveStreams - .toObservable(client.requestSubscription(TestUtil.utf8EncodedPayload("hello sub", "metadata sub"))) - .take(10) - .subscribe(ts); - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - } - - - public void requestResponseN(int timeout, int count) { - - TestSubscriber ts = TestSubscriber.create(); - - Observable - .range(1, count) - .flatMap(i -> - RxReactiveStreams - .toObservable(client.requestResponse(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .map(payload -> TestUtil.byteToString(payload.getData())) - ) - .doOnError(Throwable::printStackTrace) - .subscribe(ts); - - ts.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); - ts.assertValueCount(count); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - -} \ No newline at end of file diff --git a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java index 221c2ae2e..f3d404824 100644 --- a/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java +++ b/reactivesocket-transport-websocket/src/main/java/io/reactivesocket/transport/websocket/server/ReactiveSocketServerHandler.java @@ -25,6 +25,7 @@ import io.reactivesocket.LeaseGovernor; import io.reactivesocket.ReactiveSocket; import io.reactivesocket.transport.tcp.MutableDirectByteBuf; +import io.reactivesocket.util.Unsafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromServerConnection(connection, setupHandler, leaseGovernor, Throwable::printStackTrace); // Note: No blocking code here (still it should be refactored) - reactiveSocket.startAndWait(); + Unsafe.startAndWait(reactiveSocket); } @Override diff --git a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/ClientServerTest.java b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/ClientServerTest.java index ff239709c..d184de6a1 100644 --- a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/ClientServerTest.java +++ b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/ClientServerTest.java @@ -35,6 +35,7 @@ import io.reactivesocket.transport.websocket.client.ClientWebSocketDuplexConnection; import io.reactivesocket.transport.websocket.server.ReactiveSocketServerHandler; import io.reactivesocket.test.TestUtil; +import io.reactivesocket.util.Unsafe; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -131,8 +132,7 @@ protected void initChannel(Channel ch) throws Exception { client = DefaultReactiveSocket .fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace()); - client.startAndWait(); - + Unsafe.startAndWait(client); } @AfterClass diff --git a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java index 52e860986..8241f753d 100644 --- a/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java +++ b/reactivesocket-transport-websocket/src/test/java/io/reactivesocket/transport/websocket/Ping.java @@ -21,6 +21,7 @@ import io.reactivesocket.Payload; import io.reactivesocket.ReactiveSocket; import io.reactivesocket.transport.websocket.client.ClientWebSocketDuplexConnection; +import io.reactivesocket.util.Unsafe; import org.HdrHistogram.Recorder; import org.reactivestreams.Publisher; import rx.Observable; @@ -47,7 +48,7 @@ public static void main(String... args) throws Exception { ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(duplexConnection, setupPayload, Throwable::printStackTrace); - reactiveSocket.startAndWait(); + Unsafe.startAndWait(reactiveSocket); byte[] data = "hello".getBytes(StandardCharsets.UTF_8); diff --git a/settings.gradle b/settings.gradle index 14e459afc..1411e1476 100644 --- a/settings.gradle +++ b/settings.gradle @@ -7,6 +7,5 @@ include 'reactivesocket-mime-types' include 'reactivesocket-stats-servo' include 'reactivesocket-test' include 'reactivesocket-transport-aeron' -include 'reactivesocket-transport-local' include 'reactivesocket-transport-tcp' include 'reactivesocket-transport-websocket' From 13392a535b4b79b1ce167aaeb26977874f7d9043 Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Wed, 13 Jul 2016 11:27:50 -0700 Subject: [PATCH 2/2] Add back local-transport --- reactivesocket-transport-local/build.gradle | 19 ++ .../local/LocalClientDuplexConnection.java | 107 ++++++++++ .../LocalClientReactiveSocketConnector.java | 72 +++++++ .../local/LocalReactiveSocketManager.java | 54 +++++ .../local/LocalServerDuplexConection.java | 108 ++++++++++ .../LocalServerReactiveSocketConnector.java | 65 ++++++ .../local/ClientServerTest.java | 185 ++++++++++++++++++ settings.gradle | 1 + 8 files changed, 611 insertions(+) create mode 100644 reactivesocket-transport-local/build.gradle create mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java create mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java create mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java create mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java create mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java create mode 100644 reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java diff --git a/reactivesocket-transport-local/build.gradle b/reactivesocket-transport-local/build.gradle new file mode 100644 index 000000000..1c943599d --- /dev/null +++ b/reactivesocket-transport-local/build.gradle @@ -0,0 +1,19 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +dependencies { + compile project(':reactivesocket-transport-tcp') + compile project(':reactivesocket-core') + + testCompile project(':reactivesocket-test') +} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java new file mode 100644 index 000000000..86855f9cc --- /dev/null +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java @@ -0,0 +1,107 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.local; + +import io.reactivesocket.DuplexConnection; +import io.reactivesocket.Frame; +import io.reactivesocket.internal.EmptySubject; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Observable; +import io.reactivesocket.rx.Observer; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.CopyOnWriteArrayList; + +class LocalClientDuplexConnection implements DuplexConnection { + private final String name; + + private final CopyOnWriteArrayList> subjects; + private final EmptySubject closeSubject = new EmptySubject(); + + public LocalClientDuplexConnection(String name) { + this.name = name; + this.subjects = new CopyOnWriteArrayList<>(); + } + + @Override + public Observable getInput() { + return o -> { + o.onSubscribe(() -> subjects.removeIf(s -> s == o)); + subjects.add(o); + }; + } + + @Override + public void addOutput(Publisher o, Completable callback) { + o + .subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Frame frame) { + try { + LocalReactiveSocketManager + .getInstance() + .getServerConnection(name) + .write(frame); + } catch (Throwable t) { + onError(t); + } + } + + @Override + public void onError(Throwable t) { + callback.error(t); + } + + @Override + public void onComplete() { + callback.success(); + } + }); + } + + @Override + public double availability() { + return 1.0; + } + + void write(Frame frame) { + subjects + .forEach(o -> o.onNext(frame)); + } + + @Override + public Publisher close() { + return s -> { + LocalReactiveSocketManager + .getInstance() + .removeClientConnection(name); + closeSubject.subscribe(s); + }; + } + + @Override + public Publisher onClose() { + return closeSubject; + } +} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java new file mode 100644 index 000000000..d2bcce038 --- /dev/null +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java @@ -0,0 +1,72 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.local; + +import io.reactivesocket.*; +import io.reactivesocket.internal.rx.EmptySubscription; +import io.reactivesocket.util.Unsafe; +import org.reactivestreams.Publisher; + +public class LocalClientReactiveSocketConnector implements ReactiveSocketConnector { + public static final LocalClientReactiveSocketConnector INSTANCE = new LocalClientReactiveSocketConnector(); + + private LocalClientReactiveSocketConnector() {} + + @Override + public Publisher connect(Config config) { + return s -> { + try { + s.onSubscribe(EmptySubscription.INSTANCE); + LocalClientDuplexConnection clientConnection = LocalReactiveSocketManager + .getInstance() + .getClientConnection(config.getName()); + ReactiveSocket reactiveSocket = DefaultReactiveSocket + .fromClientConnection(clientConnection, ConnectionSetupPayload.create(config.getMetadataMimeType(), config.getDataMimeType())); + + Unsafe.startAndWait(reactiveSocket); + + s.onNext(reactiveSocket); + s.onComplete(); + } catch (Throwable t) { + s.onError(t); + } + }; + } + + public static class Config { + final String name; + final String metadataMimeType; + final String dataMimeType; + + public Config(String name, String metadataMimeType, String dataMimeType) { + this.name = name; + this.metadataMimeType = metadataMimeType; + this.dataMimeType = dataMimeType; + } + + public String getName() { + return name; + } + + public String getMetadataMimeType() { + return metadataMimeType; + } + + public String getDataMimeType() { + return dataMimeType; + } + } +} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java new file mode 100644 index 000000000..60d246266 --- /dev/null +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java @@ -0,0 +1,54 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.local; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * Created by rroeser on 4/2/16. + */ +class LocalReactiveSocketManager { + private static final LocalReactiveSocketManager INSTANCE = new LocalReactiveSocketManager(); + + private final ConcurrentHashMap serverConnections; + private final ConcurrentHashMap clientConnections; + + private LocalReactiveSocketManager() { + serverConnections = new ConcurrentHashMap<>(); + clientConnections = new ConcurrentHashMap<>(); + } + + public static LocalReactiveSocketManager getInstance() { + return INSTANCE; + } + + public LocalClientDuplexConnection getClientConnection(String name) { + return clientConnections.computeIfAbsent(name, LocalClientDuplexConnection::new); + } + + public void removeClientConnection(String name) { + clientConnections.remove(name); + } + + public LocalServerDuplexConection getServerConnection(String name) { + return serverConnections.computeIfAbsent(name, LocalServerDuplexConection::new); + } + + public void removeServerDuplexConnection(String name) { + serverConnections.remove(name); + } + +} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java new file mode 100644 index 000000000..9a3dde4d0 --- /dev/null +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java @@ -0,0 +1,108 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.local; + +import io.reactivesocket.DuplexConnection; +import io.reactivesocket.Frame; +import io.reactivesocket.internal.EmptySubject; +import io.reactivesocket.rx.Completable; +import io.reactivesocket.rx.Observable; +import io.reactivesocket.rx.Observer; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.concurrent.CopyOnWriteArrayList; + +class LocalServerDuplexConection implements DuplexConnection { + private final String name; + + private final CopyOnWriteArrayList> subjects; + private final EmptySubject closeSubject = new EmptySubject(); + + public LocalServerDuplexConection(String name) { + this.name = name; + this.subjects = new CopyOnWriteArrayList<>(); + } + + @Override + public Observable getInput() { + return o -> { + o.onSubscribe(() -> subjects.removeIf(s -> s == o)); + subjects.add(o); + }; + } + + @Override + public void addOutput(Publisher o, Completable callback) { + o + .subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Frame frame) { + try { + LocalReactiveSocketManager + .getInstance() + .getClientConnection(name) + .write(frame); + } catch (Throwable t) { + onError(t); + } + } + + @Override + public void onError(Throwable t) { + callback.error(t); + } + + @Override + public void onComplete() { + callback.success(); + } + }); + } + + @Override + public double availability() { + return 1.0; + } + + void write(Frame frame) { + subjects + .forEach(o -> o.onNext(frame)); + } + + @Override + public Publisher close() { + return s -> { + LocalReactiveSocketManager + .getInstance() + .removeServerDuplexConnection(name); + s.onComplete(); + closeSubject.onComplete(); + }; + } + + @Override + public Publisher onClose() { + return closeSubject; + } +} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java new file mode 100644 index 000000000..984618394 --- /dev/null +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java @@ -0,0 +1,65 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.local; + +import io.reactivesocket.*; +import io.reactivesocket.internal.rx.EmptySubscription; +import io.reactivesocket.util.Unsafe; +import org.reactivestreams.Publisher; + +public class LocalServerReactiveSocketConnector implements ReactiveSocketConnector { + public static final LocalServerReactiveSocketConnector INSTANCE = new LocalServerReactiveSocketConnector(); + + private LocalServerReactiveSocketConnector() {} + + @Override + public Publisher connect(Config config) { + return s -> { + try { + s.onSubscribe(EmptySubscription.INSTANCE); + LocalServerDuplexConection clientConnection = LocalReactiveSocketManager + .getInstance() + .getServerConnection(config.getName()); + ReactiveSocket reactiveSocket = DefaultReactiveSocket + .fromServerConnection(clientConnection, config.getConnectionSetupHandler()); + + Unsafe.startAndWait(reactiveSocket); + s.onNext(reactiveSocket); + s.onComplete(); + } catch (Throwable t) { + s.onError(t); + } + }; + } + + public static class Config { + final String name; + final ConnectionSetupHandler connectionSetupHandler; + + public Config(String name, ConnectionSetupHandler connectionSetupHandler) { + this.name = name; + this.connectionSetupHandler = connectionSetupHandler; + } + + public ConnectionSetupHandler getConnectionSetupHandler() { + return connectionSetupHandler; + } + + public String getName() { + return name; + } + } +} diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java new file mode 100644 index 000000000..a62871e8f --- /dev/null +++ b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java @@ -0,0 +1,185 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivesocket.local; + +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.RequestHandler; +import io.reactivesocket.exceptions.SetupException; +import io.reactivesocket.test.TestUtil; +import org.junit.BeforeClass; +import org.junit.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import rx.Observable; +import rx.RxReactiveStreams; +import rx.observers.TestSubscriber; + +import java.util.concurrent.TimeUnit; + +import static io.reactivesocket.util.Unsafe.toSingleFuture; + +public class ClientServerTest { + + static ReactiveSocket client; + + static ReactiveSocket server; + + @BeforeClass + public static void setup() throws Exception { + LocalServerReactiveSocketConnector.Config serverConfig = new LocalServerReactiveSocketConnector.Config("test", new ConnectionSetupHandler() { + @Override + public RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket rs) throws SetupException { + return new RequestHandler() { + @Override + public Publisher handleRequestResponse(Payload payload) { + return s -> { + Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); + s.onNext(response); + s.onComplete(); + }; + } + + @Override + public Publisher handleRequestStream(Payload payload) { + Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); + + return RxReactiveStreams + .toPublisher(Observable + .range(1, 10) + .map(i -> response)); + } + + @Override + public Publisher handleSubscription(Payload payload) { + Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); + + return RxReactiveStreams + .toPublisher(Observable + .range(1, 10) + .map(i -> response) + .repeat()); + } + + @Override + public Publisher handleFireAndForget(Payload payload) { + return Subscriber::onComplete; + } + + @Override + public Publisher handleChannel(Payload initialPayload, Publisher inputs) { + return null; + } + + @Override + public Publisher handleMetadataPush(Payload payload) { + return null; + } + }; + } + }); + + server = toSingleFuture(LocalServerReactiveSocketConnector.INSTANCE.connect(serverConfig)).get(5, TimeUnit.SECONDS); + + LocalClientReactiveSocketConnector.Config clientConfig = new LocalClientReactiveSocketConnector.Config("test", "text", "text"); + client = toSingleFuture(LocalClientReactiveSocketConnector.INSTANCE.connect(clientConfig)).get(5, TimeUnit.SECONDS);; + } + + @Test + public void testRequestResponse1() { + requestResponseN(1500, 1); + } + + @Test + public void testRequestResponse10() { + requestResponseN(1500, 10); + } + + + @Test + public void testRequestResponse100() { + requestResponseN(1500, 100); + } + + @Test + public void testRequestResponse10_000() { + requestResponseN(60_000, 10_000); + } + + + @Test + public void testRequestResponse100_000() { + requestResponseN(60_000, 10_000); + } + @Test + public void testRequestResponse1_000_000() { + requestResponseN(60_000, 10_000); + } + + @Test + public void testRequestStream() { + TestSubscriber ts = TestSubscriber.create(); + + RxReactiveStreams + .toObservable(client.requestStream(TestUtil.utf8EncodedPayload("hello", "metadata"))) + .subscribe(ts); + + + ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); + ts.assertValueCount(10); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void testRequestSubscription() throws InterruptedException { + TestSubscriber ts = TestSubscriber.create(); + + RxReactiveStreams + .toObservable(client.requestSubscription(TestUtil.utf8EncodedPayload("hello sub", "metadata sub"))) + .take(10) + .subscribe(ts); + + ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); + ts.assertValueCount(10); + ts.assertNoErrors(); + } + + + public void requestResponseN(int timeout, int count) { + + TestSubscriber ts = TestSubscriber.create(); + + Observable + .range(1, count) + .flatMap(i -> + RxReactiveStreams + .toObservable(client.requestResponse(TestUtil.utf8EncodedPayload("hello", "metadata"))) + .map(payload -> TestUtil.byteToString(payload.getData())) + ) + .doOnError(Throwable::printStackTrace) + .subscribe(ts); + + ts.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); + ts.assertValueCount(count); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 1411e1476..14e459afc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -7,5 +7,6 @@ include 'reactivesocket-mime-types' include 'reactivesocket-stats-servo' include 'reactivesocket-test' include 'reactivesocket-transport-aeron' +include 'reactivesocket-transport-local' include 'reactivesocket-transport-tcp' include 'reactivesocket-transport-websocket'