diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java deleted file mode 100644 index bafdd3028..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.io.IOException; -import java.util.concurrent.CopyOnWriteArrayList; - -class LocalClientDuplexConnection implements DuplexConnection { - private final String name; - - private final CopyOnWriteArrayList> subjects; - - public LocalClientDuplexConnection(String name) { - this.name = name; - this.subjects = new CopyOnWriteArrayList<>(); - } - - @Override - public Observable getInput() { - return o -> { - o.onSubscribe(() -> subjects.removeIf(s -> s == o)); - subjects.add(o); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - o - .subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Frame frame) { - try { - LocalReactiveSocketManager - .getInstance() - .getServerConnection(name) - .write(frame); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable t) { - callback.error(t); - } - - @Override - public void onComplete() { - callback.success(); - } - }); - } - - @Override - public double availability() { - return 1.0; - } - - void write(Frame frame) { - subjects - .forEach(o -> o.onNext(frame)); - } - - @Override - public void close() throws IOException { - LocalReactiveSocketManager - .getInstance() - .removeClientConnection(name); - - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java deleted file mode 100644 index 43740f4b1..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.*; -import io.reactivesocket.internal.rx.EmptySubscription; -import org.reactivestreams.Publisher; - -public class LocalClientReactiveSocketConnector implements ReactiveSocketConnector { - public static final LocalClientReactiveSocketConnector INSTANCE = new LocalClientReactiveSocketConnector(); - - private LocalClientReactiveSocketConnector() {} - - @Override - public Publisher connect(Config config) { - return s -> { - try { - s.onSubscribe(EmptySubscription.INSTANCE); - LocalClientDuplexConnection clientConnection = LocalReactiveSocketManager - .getInstance() - .getClientConnection(config.getName()); - ReactiveSocket reactiveSocket = DefaultReactiveSocket - .fromClientConnection(clientConnection, ConnectionSetupPayload.create(config.getMetadataMimeType(), config.getDataMimeType())); - - reactiveSocket.startAndWait(); - - s.onNext(reactiveSocket); - s.onComplete(); - } catch (Throwable t) { - s.onError(t); - } - }; - } - - public static class Config { - final String name; - final String metadataMimeType; - final String dataMimeType; - - public Config(String name, String metadataMimeType, String dataMimeType) { - this.name = name; - this.metadataMimeType = metadataMimeType; - this.dataMimeType = dataMimeType; - } - - public String getName() { - return name; - } - - public String getMetadataMimeType() { - return metadataMimeType; - } - - public String getDataMimeType() { - return dataMimeType; - } - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java deleted file mode 100644 index 60d246266..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * Created by rroeser on 4/2/16. - */ -class LocalReactiveSocketManager { - private static final LocalReactiveSocketManager INSTANCE = new LocalReactiveSocketManager(); - - private final ConcurrentHashMap serverConnections; - private final ConcurrentHashMap clientConnections; - - private LocalReactiveSocketManager() { - serverConnections = new ConcurrentHashMap<>(); - clientConnections = new ConcurrentHashMap<>(); - } - - public static LocalReactiveSocketManager getInstance() { - return INSTANCE; - } - - public LocalClientDuplexConnection getClientConnection(String name) { - return clientConnections.computeIfAbsent(name, LocalClientDuplexConnection::new); - } - - public void removeClientConnection(String name) { - clientConnections.remove(name); - } - - public LocalServerDuplexConection getServerConnection(String name) { - return serverConnections.computeIfAbsent(name, LocalServerDuplexConection::new); - } - - public void removeServerDuplexConnection(String name) { - serverConnections.remove(name); - } - -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java deleted file mode 100644 index baaf3800d..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.io.IOException; -import java.util.concurrent.CopyOnWriteArrayList; - -class LocalServerDuplexConection implements DuplexConnection { - private final String name; - - private final CopyOnWriteArrayList> subjects; - - public LocalServerDuplexConection(String name) { - this.name = name; - this.subjects = new CopyOnWriteArrayList<>(); - } - - @Override - public Observable getInput() { - return o -> { - o.onSubscribe(() -> subjects.removeIf(s -> s == o)); - subjects.add(o); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - o - .subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Frame frame) { - try { - LocalReactiveSocketManager - .getInstance() - .getClientConnection(name) - .write(frame); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable t) { - callback.error(t); - } - - @Override - public void onComplete() { - callback.success(); - } - }); - } - - @Override - public double availability() { - return 1.0; - } - - void write(Frame frame) { - subjects - .forEach(o -> o.onNext(frame)); - } - - @Override - public void close() throws IOException { - LocalReactiveSocketManager - .getInstance() - .removeServerDuplexConnection(name); - - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java deleted file mode 100644 index 58ad1d62b..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.*; -import io.reactivesocket.internal.rx.EmptySubscription; -import org.reactivestreams.Publisher; - -public class LocalServerReactiveSocketConnector implements ReactiveSocketConnector { - public static final LocalServerReactiveSocketConnector INSTANCE = new LocalServerReactiveSocketConnector(); - - private LocalServerReactiveSocketConnector() {} - - @Override - public Publisher connect(Config config) { - return s -> { - try { - s.onSubscribe(EmptySubscription.INSTANCE); - LocalServerDuplexConection clientConnection = LocalReactiveSocketManager - .getInstance() - .getServerConnection(config.getName()); - ReactiveSocket reactiveSocket = DefaultReactiveSocket - .fromServerConnection(clientConnection, config.getConnectionSetupHandler()); - - reactiveSocket.startAndWait(); - s.onNext(reactiveSocket); - s.onComplete(); - } catch (Throwable t) { - s.onError(t); - } - }; - } - - public static class Config { - final String name; - final ConnectionSetupHandler connectionSetupHandler; - - public Config(String name, ConnectionSetupHandler connectionSetupHandler) { - this.name = name; - this.connectionSetupHandler = connectionSetupHandler; - } - - public ConnectionSetupHandler getConnectionSetupHandler() { - return connectionSetupHandler; - } - - public String getName() { - return name; - } - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/client/LocalReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/client/LocalReactiveSocketConnector.java new file mode 100644 index 000000000..8a836db3c --- /dev/null +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/client/LocalReactiveSocketConnector.java @@ -0,0 +1,63 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.local.client; + +import io.netty.channel.local.LocalChannel; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.Frame; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketConnector; +import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; +import io.reactivex.netty.RxNetty; +import io.reactivex.netty.protocol.tcp.client.TcpClient; +import org.reactivestreams.Publisher; + +import java.net.SocketAddress; +import java.util.function.Consumer; +import java.util.function.Function; + +public class LocalReactiveSocketConnector implements ReactiveSocketConnector { + + private final TcpReactiveSocketConnector delegate; + + private LocalReactiveSocketConnector(TcpReactiveSocketConnector delegate) { + this.delegate = delegate; + } + + @Override + public Publisher connect(SocketAddress address) { + return delegate.connect(address); + } + /** + * Configures the underlying {@link TcpClient} used by this connector. + * + * @param configurator Function to transform the client. + * + * @return A new {@link LocalReactiveSocketConnector} + */ + public LocalReactiveSocketConnector configureClient( + Function, TcpClient> configurator) { + return new LocalReactiveSocketConnector(delegate.configureClient(configurator)); + } + + public static LocalReactiveSocketConnector create(ConnectionSetupPayload setupPayload, + Consumer errorStream) { + TcpReactiveSocketConnector delegate = + TcpReactiveSocketConnector.create(setupPayload, errorStream, socketAddress -> { + return TcpClient.newClient(socketAddress, RxNetty.getRxEventLoopProvider().globalClientEventLoop(), + LocalChannel.class); + }); + return new LocalReactiveSocketConnector(delegate); + } +} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/server/LocalReactiveSocketServer.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/server/LocalReactiveSocketServer.java new file mode 100644 index 000000000..372ec3313 --- /dev/null +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/server/LocalReactiveSocketServer.java @@ -0,0 +1,89 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.local.server; + +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalServerChannel; +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.Frame; +import io.reactivesocket.LeaseGovernor; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer.StartedServer; +import io.reactivex.netty.RxNetty; +import io.reactivex.netty.protocol.tcp.server.TcpServer; + +import java.util.function.Function; + +/** + * A server using local jvm sockets. + */ +public class LocalReactiveSocketServer { + + private final TcpReactiveSocketServer delegate; + + private LocalReactiveSocketServer(TcpReactiveSocketServer delegate) { + this.delegate = delegate; + } + + /** + * Starts this server and uses the passed {@code setupHandler} to setup accepted connection. + * + * @param setupHandler Setup handler for connections. + * + * @return A handle for the started server. + */ + public StartedServer start(ConnectionSetupHandler setupHandler) { + return start(setupHandler, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR); + } + + /** + * Starts this server and uses the passed {@code setupHandler} and {@code leaseGovernor} to setup accepted + * connection. + * + * @param setupHandler Setup handler for connections. + * @param leaseGovernor To manage leases. + * + * @return A handle for the started server. + */ + public StartedServer start(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) { + return delegate.start(setupHandler, leaseGovernor); + } + + /** + * Configures the underlying server using the passed {@code configurator}. + * + * @param configurator Function to transform the underlying server. + * + * @return New instance of {@code LocalReactiveSocketServer}. + */ + public LocalReactiveSocketServer configureServer( + Function, TcpServer> configurator) { + return new LocalReactiveSocketServer(delegate.configureServer(configurator)); + } + + /** + * Creates a new local transport server with the passed {@code id}. + * + * @param id A unique identifier within the JVM. + * + * @return A new {@code {@link LocalReactiveSocketServer} + */ + public static LocalReactiveSocketServer create(String id) { + TcpReactiveSocketServer delegate = + TcpReactiveSocketServer.create(TcpServer.newServer(new LocalAddress(id), + RxNetty.getRxEventLoopProvider().globalServerEventLoop(), + LocalServerChannel.class)); + return new LocalReactiveSocketServer(delegate); + } +} diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java deleted file mode 100644 index a62871e8f..000000000 --- a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.ConnectionSetupHandler; -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.exceptions.SetupException; -import io.reactivesocket.test.TestUtil; -import org.junit.BeforeClass; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import rx.Observable; -import rx.RxReactiveStreams; -import rx.observers.TestSubscriber; - -import java.util.concurrent.TimeUnit; - -import static io.reactivesocket.util.Unsafe.toSingleFuture; - -public class ClientServerTest { - - static ReactiveSocket client; - - static ReactiveSocket server; - - @BeforeClass - public static void setup() throws Exception { - LocalServerReactiveSocketConnector.Config serverConfig = new LocalServerReactiveSocketConnector.Config("test", new ConnectionSetupHandler() { - @Override - public RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket rs) throws SetupException { - return new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return s -> { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - s.onNext(response); - s.onComplete(); - }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response) - .repeat()); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - return null; - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } - }; - } - }); - - server = toSingleFuture(LocalServerReactiveSocketConnector.INSTANCE.connect(serverConfig)).get(5, TimeUnit.SECONDS); - - LocalClientReactiveSocketConnector.Config clientConfig = new LocalClientReactiveSocketConnector.Config("test", "text", "text"); - client = toSingleFuture(LocalClientReactiveSocketConnector.INSTANCE.connect(clientConfig)).get(5, TimeUnit.SECONDS);; - } - - @Test - public void testRequestResponse1() { - requestResponseN(1500, 1); - } - - @Test - public void testRequestResponse10() { - requestResponseN(1500, 10); - } - - - @Test - public void testRequestResponse100() { - requestResponseN(1500, 100); - } - - @Test - public void testRequestResponse10_000() { - requestResponseN(60_000, 10_000); - } - - - @Test - public void testRequestResponse100_000() { - requestResponseN(60_000, 10_000); - } - @Test - public void testRequestResponse1_000_000() { - requestResponseN(60_000, 10_000); - } - - @Test - public void testRequestStream() { - TestSubscriber ts = TestSubscriber.create(); - - RxReactiveStreams - .toObservable(client.requestStream(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .subscribe(ts); - - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - @Test - public void testRequestSubscription() throws InterruptedException { - TestSubscriber ts = TestSubscriber.create(); - - RxReactiveStreams - .toObservable(client.requestSubscription(TestUtil.utf8EncodedPayload("hello sub", "metadata sub"))) - .take(10) - .subscribe(ts); - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - } - - - public void requestResponseN(int timeout, int count) { - - TestSubscriber ts = TestSubscriber.create(); - - Observable - .range(1, count) - .flatMap(i -> - RxReactiveStreams - .toObservable(client.requestResponse(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .map(payload -> TestUtil.byteToString(payload.getData())) - ) - .doOnError(Throwable::printStackTrace) - .subscribe(ts); - - ts.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); - ts.assertValueCount(count); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - -} \ No newline at end of file diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientServerTest.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientServerTest.java new file mode 100644 index 000000000..89445eb91 --- /dev/null +++ b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientServerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.local; + +import org.junit.Rule; +import org.junit.Test; + +public class LocalClientServerTest { + + @Rule + public final LocalClientSetupRule setup = new LocalClientSetupRule(); + + @Test(timeout = 60000) + public void testRequestResponse1() { + setup.testRequestResponseN(1); + } + + @Test(timeout = 60000) + public void testRequestResponse10() { + setup.testRequestResponseN(10); + } + + + @Test(timeout = 60000) + public void testRequestResponse100() { + setup.testRequestResponseN(100); + } + + @Test(timeout = 60000) + public void testRequestResponse10_000() { + setup.testRequestResponseN(10_000); + } + + @Test(timeout = 60000) + public void testRequestStream() { + setup.testRequestStream(); + } + + @Test(timeout = 60000) + public void testRequestSubscription() throws InterruptedException { + setup.testRequestSubscription(); + } +} \ No newline at end of file diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientSetupRule.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientSetupRule.java new file mode 100644 index 000000000..26ef0a895 --- /dev/null +++ b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientSetupRule.java @@ -0,0 +1,52 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.local; + +import io.netty.util.internal.ThreadLocalRandom; +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.RequestHandler; +import io.reactivesocket.local.client.LocalReactiveSocketConnector; +import io.reactivesocket.local.server.LocalReactiveSocketServer; +import io.reactivesocket.test.ClientSetupRule; +import io.reactivesocket.test.TestRequestHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.netty.handler.logging.LogLevel.*; + +public class LocalClientSetupRule extends ClientSetupRule { + + private static final Logger logger = LoggerFactory.getLogger(LocalClientSetupRule.class); + + public LocalClientSetupRule() { + super(LocalReactiveSocketConnector.create(ConnectionSetupPayload.create("", ""), Throwable::printStackTrace) + .configureClient(client -> client.enableWireLogging("test-client", DEBUG)), + () -> { + String id = "test-" + ThreadLocalRandom.current().nextInt(100); + logger.info("Local socket id: " + id); + + return LocalReactiveSocketServer.create(id) + .configureServer(server -> server.enableWireLogging(id, DEBUG)) + .start(new ConnectionSetupHandler() { + @Override + public RequestHandler apply(ConnectionSetupPayload s, ReactiveSocket rs) { + return new TestRequestHandler(); + } + }).getServerAddress(); + }); + } + +} diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalPing.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalPing.java new file mode 100644 index 000000000..83ea466c2 --- /dev/null +++ b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalPing.java @@ -0,0 +1,48 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.local; + +import io.netty.channel.local.LocalAddress; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.local.client.LocalReactiveSocketConnector; +import io.reactivesocket.local.server.LocalReactiveSocketServer; +import io.reactivesocket.test.PingClient; +import io.reactivesocket.test.PingHandler; +import org.HdrHistogram.Recorder; + +import java.util.concurrent.TimeUnit; + +public final class LocalPing { + + public static final String SERVER_ID = "local-pong"; + + public static void main(String... args) throws Exception { + + LocalReactiveSocketServer.create(SERVER_ID) + .start(new PingHandler()); + + ConnectionSetupPayload payload = ConnectionSetupPayload.create("", ""); + LocalReactiveSocketConnector connector = LocalReactiveSocketConnector.create(payload, + Throwable::printStackTrace); + PingClient pingClient = new PingClient(connector); + Recorder recorder = pingClient.startTracker(1, TimeUnit.SECONDS); + final int count = 1_000_000; + pingClient.connect(new LocalAddress(SERVER_ID)) + .startPingPong(count, recorder) + .doOnTerminate(() -> { + System.out.println("Sent " + count + " messages."); + }) + .toBlocking() + .last(); + } +}