From 82cfd57d5336a7ce9ae1f0174b74e6d7924dc0c9 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Sun, 26 Jun 2016 22:22:00 -0700 Subject: [PATCH] Local transport now uses tcp. Problem Local transport was using a custom local socket implementation. Netty provides a `LocalChannel` abstraction that can be used. Modification Modified reactivesocket-transport-local` to use `reactivesocket-transport-tcp`. Refactored common test classes to `reactivesocket-test` module so they can be used in both tcp and local transport without code duplication. --- .../util/ObserverSubscriber.java | 42 ++++ .../io/reactivesocket/util/PayloadImpl.java | 80 ++++++++ reactivesocket-test/build.gradle | 15 ++ .../reactivesocket/test/ClientSetupRule.java | 105 ++++++++++ .../io/reactivesocket/test/PingClient.java | 74 +++++++ .../io/reactivesocket/test/PingHandler.java | 51 +++++ .../test}/TestRequestHandler.java | 25 +-- reactivesocket-transport-local/build.gradle | 15 ++ .../local/LocalClientDuplexConnection.java | 99 ---------- .../LocalClientReactiveSocketConnector.java | 71 ------- .../local/LocalReactiveSocketManager.java | 54 ----- .../local/LocalServerDuplexConection.java | 99 ---------- .../LocalServerReactiveSocketConnector.java | 64 ------ .../client/LocalReactiveSocketConnector.java | 63 ++++++ .../server/LocalReactiveSocketServer.java | 89 +++++++++ .../local/ClientServerTest.java | 185 ------------------ .../local/LocalClientServerTest.java | 53 +++++ .../local/LocalClientSetupRule.java | 52 +++++ .../io/reactivesocket/local/LocalPing.java | 48 +++++ .../transport/tcp/ObserverSubscriber.java | 46 ----- .../transport/tcp/TcpDuplexConnection.java | 23 +-- .../client/TcpReactiveSocketConnector.java | 46 ++++- .../tcp/server/TcpReactiveSocketServer.java | 35 ++-- .../transport/tcp/ClientServerTest.java | 88 ++------- .../transport/tcp/ClientSetupRule.java | 78 -------- .../transport/tcp/PayloadImpl.java | 56 ------ .../io/reactivesocket/transport/tcp/Ping.java | 108 ---------- .../io/reactivesocket/transport/tcp/Pong.java | 56 ------ .../transport/tcp/TcpClientSetupRule.java | 45 +++++ .../reactivesocket/transport/tcp/TcpPing.java | 39 ++++ .../transport/tcp/TcpPongServer.java | 25 +++ 31 files changed, 896 insertions(+), 1033 deletions(-) create mode 100644 reactivesocket-core/src/main/java/io/reactivesocket/util/ObserverSubscriber.java create mode 100644 reactivesocket-core/src/main/java/io/reactivesocket/util/PayloadImpl.java create mode 100644 reactivesocket-test/src/main/java/io/reactivesocket/test/ClientSetupRule.java create mode 100644 reactivesocket-test/src/main/java/io/reactivesocket/test/PingClient.java create mode 100644 reactivesocket-test/src/main/java/io/reactivesocket/test/PingHandler.java rename {reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp => reactivesocket-test/src/main/java/io/reactivesocket/test}/TestRequestHandler.java (69%) delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java create mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/client/LocalReactiveSocketConnector.java create mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/server/LocalReactiveSocketServer.java delete mode 100644 reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java create mode 100644 reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientServerTest.java create mode 100644 reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientSetupRule.java create mode 100644 reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalPing.java delete mode 100644 reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java delete mode 100644 reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientSetupRule.java delete mode 100644 reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/PayloadImpl.java delete mode 100644 reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java delete mode 100644 reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java create mode 100644 reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpClientSetupRule.java create mode 100644 reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPing.java create mode 100644 reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPongServer.java diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/util/ObserverSubscriber.java b/reactivesocket-core/src/main/java/io/reactivesocket/util/ObserverSubscriber.java new file mode 100644 index 000000000..2dd84f95e --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/util/ObserverSubscriber.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.util; + +import io.reactivesocket.Frame; +import io.reactivesocket.rx.Observer; +import rx.Subscriber; + +public class ObserverSubscriber extends Subscriber { + + private final Observer o; + + public ObserverSubscriber(Observer o) { + this.o = o; + } + + @Override + public void onCompleted() { + o.onComplete(); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onNext(Frame frame) { + o.onNext(frame); + } +} diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/util/PayloadImpl.java b/reactivesocket-core/src/main/java/io/reactivesocket/util/PayloadImpl.java new file mode 100644 index 000000000..fbbdd1ed1 --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/util/PayloadImpl.java @@ -0,0 +1,80 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.util; + +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +/** + * An implementation of {@link Payload} + */ +public class PayloadImpl implements Payload { + + private final ByteBuffer data; + private final ByteBuffer metadata; + + public PayloadImpl(String data) { + this(data, (String) null); + } + + public PayloadImpl(String data, String metadata) { + this(fromString(data), fromString(metadata)); + } + + public PayloadImpl(String data, Charset charset) { + this(fromString(data, charset), fromString(null)); + } + + public PayloadImpl(String data, Charset dataCharset, String metadata, Charset metaDataCharset) { + this(fromString(data, dataCharset), fromString(metadata, metaDataCharset)); + } + + public PayloadImpl(byte[] data) { + this(ByteBuffer.wrap(data), Frame.NULL_BYTEBUFFER); + } + + public PayloadImpl(byte[] data, byte[] metadata) { + this(ByteBuffer.wrap(data), ByteBuffer.wrap(metadata)); + } + + public PayloadImpl(ByteBuffer data) { + this(data, Frame.NULL_BYTEBUFFER); + } + + public PayloadImpl(ByteBuffer data, ByteBuffer metadata) { + this.data = data; + this.metadata = null == metadata ? Frame.NULL_BYTEBUFFER : metadata; + } + + @Override + public ByteBuffer getData() { + return data; + } + + @Override + public ByteBuffer getMetadata() { + return metadata; + } + + private static ByteBuffer fromString(String data) { + return fromString(data, Charset.defaultCharset()); + } + + private static ByteBuffer fromString(String data, Charset charset) { + return data == null ? Frame.NULL_BYTEBUFFER : ByteBuffer.wrap(data.getBytes(charset)); + } +} diff --git a/reactivesocket-test/build.gradle b/reactivesocket-test/build.gradle index 5eff68dbc..93c323017 100644 --- a/reactivesocket-test/build.gradle +++ b/reactivesocket-test/build.gradle @@ -1,3 +1,18 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + dependencies { compile project(':reactivesocket-core') + compile 'junit:junit:4.12' + compile 'org.mockito:mockito-core:1.10.19' } diff --git a/reactivesocket-test/src/main/java/io/reactivesocket/test/ClientSetupRule.java b/reactivesocket-test/src/main/java/io/reactivesocket/test/ClientSetupRule.java new file mode 100644 index 000000000..013075388 --- /dev/null +++ b/reactivesocket-test/src/main/java/io/reactivesocket/test/ClientSetupRule.java @@ -0,0 +1,105 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.test; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketConnector; +import org.junit.rules.ExternalResource; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; +import org.reactivestreams.Publisher; +import rx.Observable; +import rx.functions.Func0; +import rx.observers.TestSubscriber; + +import java.net.SocketAddress; +import java.util.function.Function; + +import static io.reactivesocket.test.TestUtil.*; +import static rx.RxReactiveStreams.*; + +public class ClientSetupRule extends ExternalResource { + + private final ReactiveSocketConnector client; + private final Func0 serverStarter; + private SocketAddress serverAddress; + private ReactiveSocket reactiveSocket; + + public ClientSetupRule(ReactiveSocketConnector connector, Func0 serverStarter) { + client = connector; + this.serverStarter = serverStarter; + } + + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + serverAddress = serverStarter.call(); + reactiveSocket = toObservable(client.connect(serverAddress)).toSingle().toBlocking().value(); + + base.evaluate(); + } + }; + } + + public ReactiveSocketConnector getClient() { + return client; + } + + public SocketAddress getServerAddress() { + return serverAddress; + } + + public ReactiveSocket getReactiveSocket() { + return reactiveSocket; + } + + public void testRequestResponseN(int count) { + TestSubscriber ts = TestSubscriber.create(); + Observable + .range(1, count) + .flatMap(i -> toObservable(getReactiveSocket().requestResponse(utf8EncodedPayload("hello", "metadata"))) + .map(payload -> byteToString(payload.getData())) + ) + .doOnError(Throwable::printStackTrace) + .subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertValueCount(count); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + public void testRequestSubscription() { + _testStream( + socket -> toPublisher(toObservable(socket.requestSubscription(utf8EncodedPayload("hello", "metadata"))) + .take(10))); + } + + public void testRequestStream() { + _testStream(socket -> socket.requestStream(utf8EncodedPayload("hello", "metadata"))); + } + + private void _testStream(Function> invoker) { + TestSubscriber ts = TestSubscriber.create(); + Publisher publisher = invoker.apply(reactiveSocket); + toObservable(publisher).subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertValueCount(10); + ts.assertNoErrors(); + ts.assertCompleted(); + } +} diff --git a/reactivesocket-test/src/main/java/io/reactivesocket/test/PingClient.java b/reactivesocket-test/src/main/java/io/reactivesocket/test/PingClient.java new file mode 100644 index 000000000..088844a3d --- /dev/null +++ b/reactivesocket-test/src/main/java/io/reactivesocket/test/PingClient.java @@ -0,0 +1,74 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.test; + +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketConnector; +import io.reactivesocket.util.PayloadImpl; +import org.HdrHistogram.Recorder; +import rx.Observable; +import rx.RxReactiveStreams; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; + +public class PingClient { + + private final ReactiveSocketConnector connector; + private final String request; + private ReactiveSocket reactiveSocket; + + public PingClient(ReactiveSocketConnector connector) { + this.connector = connector; + request = "hello"; + } + + public PingClient connect(SocketAddress address) { + if (null == reactiveSocket) { + reactiveSocket = RxReactiveStreams.toObservable(connector.connect(address)) + .toSingle() + .toBlocking() + .value(); + } + return this; + } + + public Recorder startTracker(long interval, TimeUnit timeUnit) { + final Recorder histogram = new Recorder(3600000000000L, 3); + Observable.interval(interval, timeUnit) + .forEach(aLong -> { + System.out.println("---- PING/ PONG HISTO ----"); + histogram.getIntervalHistogram() + .outputPercentileDistribution(System.out, 5, 1000.0, false); + System.out.println("---- PING/ PONG HISTO ----"); + }); + return histogram; + } + + public Observable startPingPong(int count, final Recorder histogram) { + connect(new InetSocketAddress("localhost", 7878)); + return Observable.range(1, count) + .flatMap(i -> { + long start = System.nanoTime(); + return RxReactiveStreams.toObservable(reactiveSocket.requestResponse(new PayloadImpl(request))) + .doOnTerminate(() -> { + long diff = System.nanoTime() - start; + histogram.recordValue(diff); + }); + }, 16) + .doOnError(Throwable::printStackTrace); + } +} diff --git a/reactivesocket-test/src/main/java/io/reactivesocket/test/PingHandler.java b/reactivesocket-test/src/main/java/io/reactivesocket/test/PingHandler.java new file mode 100644 index 000000000..fb4938e7f --- /dev/null +++ b/reactivesocket-test/src/main/java/io/reactivesocket/test/PingHandler.java @@ -0,0 +1,51 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.test; + +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.Payload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.RequestHandler; +import io.reactivesocket.exceptions.SetupException; +import io.reactivesocket.util.PayloadImpl; +import rx.Observable; +import rx.RxReactiveStreams; + +import java.util.concurrent.ThreadLocalRandom; + +public class PingHandler implements ConnectionSetupHandler { + + private final byte[] pong; + + public PingHandler() { + pong = new byte[1024]; + ThreadLocalRandom.current().nextBytes(pong); + } + + public PingHandler(byte[] pong) { + this.pong = pong; + } + + @Override + public RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket reactiveSocket) + throws SetupException { + return new RequestHandler.Builder() + .withRequestResponse(payload -> { + Payload responsePayload = new PayloadImpl(pong); + return RxReactiveStreams.toPublisher(Observable.just(responsePayload)); + }) + .build(); + } +} diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TestRequestHandler.java b/reactivesocket-test/src/main/java/io/reactivesocket/test/TestRequestHandler.java similarity index 69% rename from reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TestRequestHandler.java rename to reactivesocket-test/src/main/java/io/reactivesocket/test/TestRequestHandler.java index 33491d644..1f254e758 100644 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TestRequestHandler.java +++ b/reactivesocket-test/src/main/java/io/reactivesocket/test/TestRequestHandler.java @@ -1,26 +1,21 @@ /* * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ -package io.reactivesocket.transport.tcp; +package io.reactivesocket.test; import io.reactivesocket.Payload; import io.reactivesocket.RequestHandler; import io.reactivesocket.exceptions.UnsupportedSetupException; -import io.reactivesocket.test.TestUtil; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import rx.Observable; diff --git a/reactivesocket-transport-local/build.gradle b/reactivesocket-transport-local/build.gradle index 887584cdc..1c943599d 100644 --- a/reactivesocket-transport-local/build.gradle +++ b/reactivesocket-transport-local/build.gradle @@ -1,4 +1,19 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + dependencies { + compile project(':reactivesocket-transport-tcp') compile project(':reactivesocket-core') + testCompile project(':reactivesocket-test') } diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java deleted file mode 100644 index bafdd3028..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.io.IOException; -import java.util.concurrent.CopyOnWriteArrayList; - -class LocalClientDuplexConnection implements DuplexConnection { - private final String name; - - private final CopyOnWriteArrayList> subjects; - - public LocalClientDuplexConnection(String name) { - this.name = name; - this.subjects = new CopyOnWriteArrayList<>(); - } - - @Override - public Observable getInput() { - return o -> { - o.onSubscribe(() -> subjects.removeIf(s -> s == o)); - subjects.add(o); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - o - .subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Frame frame) { - try { - LocalReactiveSocketManager - .getInstance() - .getServerConnection(name) - .write(frame); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable t) { - callback.error(t); - } - - @Override - public void onComplete() { - callback.success(); - } - }); - } - - @Override - public double availability() { - return 1.0; - } - - void write(Frame frame) { - subjects - .forEach(o -> o.onNext(frame)); - } - - @Override - public void close() throws IOException { - LocalReactiveSocketManager - .getInstance() - .removeClientConnection(name); - - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java deleted file mode 100644 index 43740f4b1..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.*; -import io.reactivesocket.internal.rx.EmptySubscription; -import org.reactivestreams.Publisher; - -public class LocalClientReactiveSocketConnector implements ReactiveSocketConnector { - public static final LocalClientReactiveSocketConnector INSTANCE = new LocalClientReactiveSocketConnector(); - - private LocalClientReactiveSocketConnector() {} - - @Override - public Publisher connect(Config config) { - return s -> { - try { - s.onSubscribe(EmptySubscription.INSTANCE); - LocalClientDuplexConnection clientConnection = LocalReactiveSocketManager - .getInstance() - .getClientConnection(config.getName()); - ReactiveSocket reactiveSocket = DefaultReactiveSocket - .fromClientConnection(clientConnection, ConnectionSetupPayload.create(config.getMetadataMimeType(), config.getDataMimeType())); - - reactiveSocket.startAndWait(); - - s.onNext(reactiveSocket); - s.onComplete(); - } catch (Throwable t) { - s.onError(t); - } - }; - } - - public static class Config { - final String name; - final String metadataMimeType; - final String dataMimeType; - - public Config(String name, String metadataMimeType, String dataMimeType) { - this.name = name; - this.metadataMimeType = metadataMimeType; - this.dataMimeType = dataMimeType; - } - - public String getName() { - return name; - } - - public String getMetadataMimeType() { - return metadataMimeType; - } - - public String getDataMimeType() { - return dataMimeType; - } - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java deleted file mode 100644 index 60d246266..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * Created by rroeser on 4/2/16. - */ -class LocalReactiveSocketManager { - private static final LocalReactiveSocketManager INSTANCE = new LocalReactiveSocketManager(); - - private final ConcurrentHashMap serverConnections; - private final ConcurrentHashMap clientConnections; - - private LocalReactiveSocketManager() { - serverConnections = new ConcurrentHashMap<>(); - clientConnections = new ConcurrentHashMap<>(); - } - - public static LocalReactiveSocketManager getInstance() { - return INSTANCE; - } - - public LocalClientDuplexConnection getClientConnection(String name) { - return clientConnections.computeIfAbsent(name, LocalClientDuplexConnection::new); - } - - public void removeClientConnection(String name) { - clientConnections.remove(name); - } - - public LocalServerDuplexConection getServerConnection(String name) { - return serverConnections.computeIfAbsent(name, LocalServerDuplexConection::new); - } - - public void removeServerDuplexConnection(String name) { - serverConnections.remove(name); - } - -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java deleted file mode 100644 index baaf3800d..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.io.IOException; -import java.util.concurrent.CopyOnWriteArrayList; - -class LocalServerDuplexConection implements DuplexConnection { - private final String name; - - private final CopyOnWriteArrayList> subjects; - - public LocalServerDuplexConection(String name) { - this.name = name; - this.subjects = new CopyOnWriteArrayList<>(); - } - - @Override - public Observable getInput() { - return o -> { - o.onSubscribe(() -> subjects.removeIf(s -> s == o)); - subjects.add(o); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - o - .subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Frame frame) { - try { - LocalReactiveSocketManager - .getInstance() - .getClientConnection(name) - .write(frame); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable t) { - callback.error(t); - } - - @Override - public void onComplete() { - callback.success(); - } - }); - } - - @Override - public double availability() { - return 1.0; - } - - void write(Frame frame) { - subjects - .forEach(o -> o.onNext(frame)); - } - - @Override - public void close() throws IOException { - LocalReactiveSocketManager - .getInstance() - .removeServerDuplexConnection(name); - - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java deleted file mode 100644 index 58ad1d62b..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.*; -import io.reactivesocket.internal.rx.EmptySubscription; -import org.reactivestreams.Publisher; - -public class LocalServerReactiveSocketConnector implements ReactiveSocketConnector { - public static final LocalServerReactiveSocketConnector INSTANCE = new LocalServerReactiveSocketConnector(); - - private LocalServerReactiveSocketConnector() {} - - @Override - public Publisher connect(Config config) { - return s -> { - try { - s.onSubscribe(EmptySubscription.INSTANCE); - LocalServerDuplexConection clientConnection = LocalReactiveSocketManager - .getInstance() - .getServerConnection(config.getName()); - ReactiveSocket reactiveSocket = DefaultReactiveSocket - .fromServerConnection(clientConnection, config.getConnectionSetupHandler()); - - reactiveSocket.startAndWait(); - s.onNext(reactiveSocket); - s.onComplete(); - } catch (Throwable t) { - s.onError(t); - } - }; - } - - public static class Config { - final String name; - final ConnectionSetupHandler connectionSetupHandler; - - public Config(String name, ConnectionSetupHandler connectionSetupHandler) { - this.name = name; - this.connectionSetupHandler = connectionSetupHandler; - } - - public ConnectionSetupHandler getConnectionSetupHandler() { - return connectionSetupHandler; - } - - public String getName() { - return name; - } - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/client/LocalReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/client/LocalReactiveSocketConnector.java new file mode 100644 index 000000000..8a836db3c --- /dev/null +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/client/LocalReactiveSocketConnector.java @@ -0,0 +1,63 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.local.client; + +import io.netty.channel.local.LocalChannel; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.Frame; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.ReactiveSocketConnector; +import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; +import io.reactivex.netty.RxNetty; +import io.reactivex.netty.protocol.tcp.client.TcpClient; +import org.reactivestreams.Publisher; + +import java.net.SocketAddress; +import java.util.function.Consumer; +import java.util.function.Function; + +public class LocalReactiveSocketConnector implements ReactiveSocketConnector { + + private final TcpReactiveSocketConnector delegate; + + private LocalReactiveSocketConnector(TcpReactiveSocketConnector delegate) { + this.delegate = delegate; + } + + @Override + public Publisher connect(SocketAddress address) { + return delegate.connect(address); + } + /** + * Configures the underlying {@link TcpClient} used by this connector. + * + * @param configurator Function to transform the client. + * + * @return A new {@link LocalReactiveSocketConnector} + */ + public LocalReactiveSocketConnector configureClient( + Function, TcpClient> configurator) { + return new LocalReactiveSocketConnector(delegate.configureClient(configurator)); + } + + public static LocalReactiveSocketConnector create(ConnectionSetupPayload setupPayload, + Consumer errorStream) { + TcpReactiveSocketConnector delegate = + TcpReactiveSocketConnector.create(setupPayload, errorStream, socketAddress -> { + return TcpClient.newClient(socketAddress, RxNetty.getRxEventLoopProvider().globalClientEventLoop(), + LocalChannel.class); + }); + return new LocalReactiveSocketConnector(delegate); + } +} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/server/LocalReactiveSocketServer.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/server/LocalReactiveSocketServer.java new file mode 100644 index 000000000..372ec3313 --- /dev/null +++ b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/server/LocalReactiveSocketServer.java @@ -0,0 +1,89 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.local.server; + +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalServerChannel; +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.Frame; +import io.reactivesocket.LeaseGovernor; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer.StartedServer; +import io.reactivex.netty.RxNetty; +import io.reactivex.netty.protocol.tcp.server.TcpServer; + +import java.util.function.Function; + +/** + * A server using local jvm sockets. + */ +public class LocalReactiveSocketServer { + + private final TcpReactiveSocketServer delegate; + + private LocalReactiveSocketServer(TcpReactiveSocketServer delegate) { + this.delegate = delegate; + } + + /** + * Starts this server and uses the passed {@code setupHandler} to setup accepted connection. + * + * @param setupHandler Setup handler for connections. + * + * @return A handle for the started server. + */ + public StartedServer start(ConnectionSetupHandler setupHandler) { + return start(setupHandler, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR); + } + + /** + * Starts this server and uses the passed {@code setupHandler} and {@code leaseGovernor} to setup accepted + * connection. + * + * @param setupHandler Setup handler for connections. + * @param leaseGovernor To manage leases. + * + * @return A handle for the started server. + */ + public StartedServer start(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) { + return delegate.start(setupHandler, leaseGovernor); + } + + /** + * Configures the underlying server using the passed {@code configurator}. + * + * @param configurator Function to transform the underlying server. + * + * @return New instance of {@code LocalReactiveSocketServer}. + */ + public LocalReactiveSocketServer configureServer( + Function, TcpServer> configurator) { + return new LocalReactiveSocketServer(delegate.configureServer(configurator)); + } + + /** + * Creates a new local transport server with the passed {@code id}. + * + * @param id A unique identifier within the JVM. + * + * @return A new {@code {@link LocalReactiveSocketServer} + */ + public static LocalReactiveSocketServer create(String id) { + TcpReactiveSocketServer delegate = + TcpReactiveSocketServer.create(TcpServer.newServer(new LocalAddress(id), + RxNetty.getRxEventLoopProvider().globalServerEventLoop(), + LocalServerChannel.class)); + return new LocalReactiveSocketServer(delegate); + } +} diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java deleted file mode 100644 index a62871e8f..000000000 --- a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.ConnectionSetupHandler; -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.exceptions.SetupException; -import io.reactivesocket.test.TestUtil; -import org.junit.BeforeClass; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import rx.Observable; -import rx.RxReactiveStreams; -import rx.observers.TestSubscriber; - -import java.util.concurrent.TimeUnit; - -import static io.reactivesocket.util.Unsafe.toSingleFuture; - -public class ClientServerTest { - - static ReactiveSocket client; - - static ReactiveSocket server; - - @BeforeClass - public static void setup() throws Exception { - LocalServerReactiveSocketConnector.Config serverConfig = new LocalServerReactiveSocketConnector.Config("test", new ConnectionSetupHandler() { - @Override - public RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket rs) throws SetupException { - return new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return s -> { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - s.onNext(response); - s.onComplete(); - }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response) - .repeat()); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - return null; - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } - }; - } - }); - - server = toSingleFuture(LocalServerReactiveSocketConnector.INSTANCE.connect(serverConfig)).get(5, TimeUnit.SECONDS); - - LocalClientReactiveSocketConnector.Config clientConfig = new LocalClientReactiveSocketConnector.Config("test", "text", "text"); - client = toSingleFuture(LocalClientReactiveSocketConnector.INSTANCE.connect(clientConfig)).get(5, TimeUnit.SECONDS);; - } - - @Test - public void testRequestResponse1() { - requestResponseN(1500, 1); - } - - @Test - public void testRequestResponse10() { - requestResponseN(1500, 10); - } - - - @Test - public void testRequestResponse100() { - requestResponseN(1500, 100); - } - - @Test - public void testRequestResponse10_000() { - requestResponseN(60_000, 10_000); - } - - - @Test - public void testRequestResponse100_000() { - requestResponseN(60_000, 10_000); - } - @Test - public void testRequestResponse1_000_000() { - requestResponseN(60_000, 10_000); - } - - @Test - public void testRequestStream() { - TestSubscriber ts = TestSubscriber.create(); - - RxReactiveStreams - .toObservable(client.requestStream(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .subscribe(ts); - - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - @Test - public void testRequestSubscription() throws InterruptedException { - TestSubscriber ts = TestSubscriber.create(); - - RxReactiveStreams - .toObservable(client.requestSubscription(TestUtil.utf8EncodedPayload("hello sub", "metadata sub"))) - .take(10) - .subscribe(ts); - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - } - - - public void requestResponseN(int timeout, int count) { - - TestSubscriber ts = TestSubscriber.create(); - - Observable - .range(1, count) - .flatMap(i -> - RxReactiveStreams - .toObservable(client.requestResponse(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .map(payload -> TestUtil.byteToString(payload.getData())) - ) - .doOnError(Throwable::printStackTrace) - .subscribe(ts); - - ts.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); - ts.assertValueCount(count); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - -} \ No newline at end of file diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientServerTest.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientServerTest.java new file mode 100644 index 000000000..89445eb91 --- /dev/null +++ b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientServerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.local; + +import org.junit.Rule; +import org.junit.Test; + +public class LocalClientServerTest { + + @Rule + public final LocalClientSetupRule setup = new LocalClientSetupRule(); + + @Test(timeout = 60000) + public void testRequestResponse1() { + setup.testRequestResponseN(1); + } + + @Test(timeout = 60000) + public void testRequestResponse10() { + setup.testRequestResponseN(10); + } + + + @Test(timeout = 60000) + public void testRequestResponse100() { + setup.testRequestResponseN(100); + } + + @Test(timeout = 60000) + public void testRequestResponse10_000() { + setup.testRequestResponseN(10_000); + } + + @Test(timeout = 60000) + public void testRequestStream() { + setup.testRequestStream(); + } + + @Test(timeout = 60000) + public void testRequestSubscription() throws InterruptedException { + setup.testRequestSubscription(); + } +} \ No newline at end of file diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientSetupRule.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientSetupRule.java new file mode 100644 index 000000000..26ef0a895 --- /dev/null +++ b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientSetupRule.java @@ -0,0 +1,52 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.local; + +import io.netty.util.internal.ThreadLocalRandom; +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.RequestHandler; +import io.reactivesocket.local.client.LocalReactiveSocketConnector; +import io.reactivesocket.local.server.LocalReactiveSocketServer; +import io.reactivesocket.test.ClientSetupRule; +import io.reactivesocket.test.TestRequestHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.netty.handler.logging.LogLevel.*; + +public class LocalClientSetupRule extends ClientSetupRule { + + private static final Logger logger = LoggerFactory.getLogger(LocalClientSetupRule.class); + + public LocalClientSetupRule() { + super(LocalReactiveSocketConnector.create(ConnectionSetupPayload.create("", ""), Throwable::printStackTrace) + .configureClient(client -> client.enableWireLogging("test-client", DEBUG)), + () -> { + String id = "test-" + ThreadLocalRandom.current().nextInt(100); + logger.info("Local socket id: " + id); + + return LocalReactiveSocketServer.create(id) + .configureServer(server -> server.enableWireLogging(id, DEBUG)) + .start(new ConnectionSetupHandler() { + @Override + public RequestHandler apply(ConnectionSetupPayload s, ReactiveSocket rs) { + return new TestRequestHandler(); + } + }).getServerAddress(); + }); + } + +} diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalPing.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalPing.java new file mode 100644 index 000000000..83ea466c2 --- /dev/null +++ b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalPing.java @@ -0,0 +1,48 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.local; + +import io.netty.channel.local.LocalAddress; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.local.client.LocalReactiveSocketConnector; +import io.reactivesocket.local.server.LocalReactiveSocketServer; +import io.reactivesocket.test.PingClient; +import io.reactivesocket.test.PingHandler; +import org.HdrHistogram.Recorder; + +import java.util.concurrent.TimeUnit; + +public final class LocalPing { + + public static final String SERVER_ID = "local-pong"; + + public static void main(String... args) throws Exception { + + LocalReactiveSocketServer.create(SERVER_ID) + .start(new PingHandler()); + + ConnectionSetupPayload payload = ConnectionSetupPayload.create("", ""); + LocalReactiveSocketConnector connector = LocalReactiveSocketConnector.create(payload, + Throwable::printStackTrace); + PingClient pingClient = new PingClient(connector); + Recorder recorder = pingClient.startTracker(1, TimeUnit.SECONDS); + final int count = 1_000_000; + pingClient.connect(new LocalAddress(SERVER_ID)) + .startPingPong(count, recorder) + .doOnTerminate(() -> { + System.out.println("Sent " + count + " messages."); + }) + .toBlocking() + .last(); + } +} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java deleted file mode 100644 index c4872e734..000000000 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.reactivesocket.transport.tcp; - -import io.reactivesocket.Frame; -import io.reactivesocket.rx.Observer; -import rx.Subscriber; - -public class ObserverSubscriber extends Subscriber { - - private final Observer o; - - public ObserverSubscriber(Observer o) { - this.o = o; - } - - @Override - public void onCompleted() { - o.onComplete(); - } - - @Override - public void onError(Throwable e) { - o.onError(e); - } - - @Override - public void onNext(Frame frame) { - o.onNext(frame); - } -} diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java index 53e9cb298..6c6ed20e5 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java @@ -1,18 +1,14 @@ /* * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package io.reactivesocket.transport.tcp; @@ -21,6 +17,7 @@ import io.reactivesocket.internal.rx.BooleanDisposable; import io.reactivesocket.rx.Completable; import io.reactivesocket.rx.Observable; +import io.reactivesocket.util.ObserverSubscriber; import io.reactivex.netty.channel.Connection; import org.reactivestreams.Publisher; import rx.RxReactiveStreams; diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java index ac647eda2..3c8f4184b 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java @@ -1,3 +1,16 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package io.reactivesocket.transport.tcp.client; import io.netty.buffer.ByteBuf; @@ -31,10 +44,10 @@ public class TcpReactiveSocketConnector implements ReactiveSocketConnector> socketFactories; private final ConnectionSetupPayload setupPayload; private final Consumer errorStream; - private final Function> clientFactory; + private final Function> clientFactory; private TcpReactiveSocketConnector(ConnectionSetupPayload setupPayload, Consumer errorStream, - Function> clientFactory) { + Function> clientFactory) { this.setupPayload = setupPayload; this.errorStream = errorStream; this.clientFactory = clientFactory; @@ -44,12 +57,24 @@ private TcpReactiveSocketConnector(ConnectionSetupPayload setupPayload, Consumer @Override public Publisher connect(SocketAddress address) { return _connect(socketFactories.computeIfAbsent(address, socketAddress -> { - return clientFactory.apply(socketAddress) - .addChannelHandlerLast("length-codec", ReactiveSocketLengthCodec::new) - .addChannelHandlerLast("frame-codec", ReactiveSocketFrameCodec::new); + return clientFactory.apply(socketAddress); })); } + /** + * Configures the underlying {@link TcpClient} used by this connector. + * + * @param configurator Function to transform the client. + * + * @return A new {@link TcpReactiveSocketConnector} + */ + public TcpReactiveSocketConnector configureClient( + Function, TcpClient> configurator) { + return new TcpReactiveSocketConnector(setupPayload, errorStream, socketAddress -> { + return configurator.apply(clientFactory.apply(socketAddress)); + }); + } + private Publisher _connect(TcpClient client) { Single r = Single.create(new OnSubscribe() { @Override @@ -98,12 +123,19 @@ public String toString() { public static TcpReactiveSocketConnector create(ConnectionSetupPayload setupPayload, Consumer errorStream) { return new TcpReactiveSocketConnector(setupPayload, errorStream, - socketAddress -> TcpClient.newClient(socketAddress)); + socketAddress -> _configureClient(TcpClient.newClient(socketAddress))); } public static TcpReactiveSocketConnector create(ConnectionSetupPayload setupPayload, Consumer errorStream, Function> clientFactory) { - return new TcpReactiveSocketConnector(setupPayload, errorStream, clientFactory); + return new TcpReactiveSocketConnector(setupPayload, errorStream, socketAddress -> { + return _configureClient(clientFactory.apply(socketAddress)); + }); + } + + private static TcpClient _configureClient(TcpClient client) { + return client.addChannelHandlerLast("length-codec", ReactiveSocketLengthCodec::new) + .addChannelHandlerLast("frame-codec", ReactiveSocketFrameCodec::new); } } diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/TcpReactiveSocketServer.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/TcpReactiveSocketServer.java index ac734fd41..3c1681134 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/TcpReactiveSocketServer.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/TcpReactiveSocketServer.java @@ -1,18 +1,14 @@ /* * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package io.reactivesocket.transport.tcp.server; @@ -35,6 +31,7 @@ import rx.Observable; import java.net.SocketAddress; +import java.util.function.Function; public class TcpReactiveSocketServer { @@ -87,6 +84,18 @@ public void error(Throwable e) { return new StartedServer(); } + /** + * Configures the underlying server using the passed {@code configurator}. + * + * @param configurator Function to transform the underlying server. + * + * @return New instance of {@code TcpReactiveSocketServer}. + */ + public TcpReactiveSocketServer configureServer( + Function, TcpServer> configurator) { + return new TcpReactiveSocketServer(configurator.apply(server)); + } + public static TcpReactiveSocketServer create() { return create(TcpServer.newServer()); } diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java index 273145516..e2e5b1ffe 100644 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java @@ -1,104 +1,54 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package io.reactivesocket.transport.tcp; -import io.reactivesocket.Payload; -import io.reactivesocket.test.TestUtil; +import io.reactivesocket.test.ClientSetupRule; import org.junit.Rule; import org.junit.Test; -import rx.Observable; -import rx.observers.TestSubscriber; - -import java.util.concurrent.TimeUnit; - -import static io.reactivesocket.test.TestUtil.*; -import static rx.RxReactiveStreams.*; public class ClientServerTest { @Rule - public final ClientSetupRule setup = new ClientSetupRule(); + public final ClientSetupRule setup = new TcpClientSetupRule(); @Test(timeout = 60000) public void testRequestResponse1() { - requestResponseN(1500, 1); + setup.testRequestResponseN(1); } @Test(timeout = 60000) public void testRequestResponse10() { - requestResponseN(1500, 10); + setup.testRequestResponseN(10); } @Test(timeout = 60000) public void testRequestResponse100() { - requestResponseN(1500, 100); + setup.testRequestResponseN(100); } @Test(timeout = 60000) public void testRequestResponse10_000() { - requestResponseN(60_000, 10_000); + setup.testRequestResponseN(10_000); } @Test(timeout = 60000) public void testRequestStream() { - TestSubscriber ts = TestSubscriber.create(); - - toObservable(setup.getReactiveSocket().requestStream(utf8EncodedPayload("hello", "metadata"))) - .subscribe(ts); - - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - ts.assertCompleted(); + setup.testRequestStream(); } @Test(timeout = 60000) public void testRequestSubscription() throws InterruptedException { - TestSubscriber ts = TestSubscriber.create(); - - toObservable(setup.getReactiveSocket().requestSubscription(utf8EncodedPayload("hello sub", "metadata sub"))) - .take(10) - .subscribe(ts); - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); + setup.testRequestSubscription(); } - - - public void requestResponseN(int timeout, int count) { - - TestSubscriber ts = TestSubscriber.create(); - - Observable - .range(1, count) - .flatMap(i -> - toObservable(setup.getReactiveSocket().requestResponse(utf8EncodedPayload("hello", "metadata"))) - .map(payload -> byteToString(payload.getData())) - ) - .doOnError(Throwable::printStackTrace) - .subscribe(ts); - - ts.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); - ts.assertValueCount(count); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - } \ No newline at end of file diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientSetupRule.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientSetupRule.java deleted file mode 100644 index e465d7119..000000000 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientSetupRule.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.reactivesocket.transport.tcp; - -import io.reactivesocket.ConnectionSetupHandler; -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; -import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; -import org.junit.rules.ExternalResource; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; -import rx.RxReactiveStreams; - -import java.net.SocketAddress; - -public class ClientSetupRule extends ExternalResource { - - private TcpReactiveSocketConnector client; - private TcpReactiveSocketServer server; - private SocketAddress serverAddress; - private ReactiveSocket reactiveSocket; - - @Override - public Statement apply(final Statement base, Description description) { - return new Statement() { - @Override - public void evaluate() throws Throwable { - server = TcpReactiveSocketServer.create(0); - serverAddress = server.start(new ConnectionSetupHandler() { - @Override - public RequestHandler apply(ConnectionSetupPayload s, ReactiveSocket rs) { - return new TestRequestHandler(); - } - }).getServerAddress(); - - client = TcpReactiveSocketConnector.create(ConnectionSetupPayload.create("", ""), - Throwable::printStackTrace); - reactiveSocket = RxReactiveStreams.toObservable(client.connect(serverAddress)) - .toSingle().toBlocking().value(); - - base.evaluate(); - } - }; - } - - public TcpReactiveSocketConnector getClient() { - return client; - } - - public TcpReactiveSocketServer getServer() { - return server; - } - - public SocketAddress getServerAddress() { - return serverAddress; - } - - public ReactiveSocket getReactiveSocket() { - return reactiveSocket; - } -} diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/PayloadImpl.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/PayloadImpl.java deleted file mode 100644 index 2cf54ff3a..000000000 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/PayloadImpl.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.reactivesocket.transport.tcp; - -import io.reactivesocket.Frame; -import io.reactivesocket.Payload; - -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -public class PayloadImpl implements Payload { - - private final ByteBuffer data; - - public PayloadImpl(String data) { - this.data = ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)); - } - - public PayloadImpl(String data, Charset charset) { - this.data = ByteBuffer.wrap(data.getBytes(charset)); - } - - public PayloadImpl(byte[] data) { - this.data = ByteBuffer.wrap(data); - } - - public PayloadImpl(ByteBuffer data) { - this.data = data; - } - - @Override - public ByteBuffer getData() { - return data; - } - - @Override - public ByteBuffer getMetadata() { - return Frame.NULL_BYTEBUFFER; - } -} diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java deleted file mode 100644 index fac087d0c..000000000 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.transport.tcp; - -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; -import org.HdrHistogram.Recorder; -import rx.Observable; -import rx.RxReactiveStreams; -import rx.Subscriber; -import rx.schedulers.Schedulers; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public final class Ping { - - public static void main(String... args) throws Exception { - - ReactiveSocket reactiveSocket = - RxReactiveStreams.toObservable(TcpReactiveSocketConnector.create(ConnectionSetupPayload.create("", ""), - Throwable::printStackTrace) - .connect(new InetSocketAddress("localhost", 7878))) - .toSingle() - .toBlocking() - .value(); - - byte[] data = "hello".getBytes(StandardCharsets.UTF_8); - - Payload keyPayload = new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.wrap(data); - } - - @Override - public ByteBuffer getMetadata() { - return null; - } - }; - - int n = 1_000_000; - CountDownLatch latch = new CountDownLatch(n); - final Recorder histogram = new Recorder(3600000000000L, 3); - - Schedulers - .computation() - .createWorker() - .schedulePeriodically(() -> { - System.out.println("---- PING/ PONG HISTO ----"); - histogram.getIntervalHistogram() - .outputPercentileDistribution(System.out, 5, 1000.0, false); - System.out.println("---- PING/ PONG HISTO ----"); - }, 1, 1, TimeUnit.SECONDS); - - Observable - .range(1, Integer.MAX_VALUE) - .flatMap(i -> { - long start = System.nanoTime(); - - return RxReactiveStreams.toObservable(reactiveSocket.requestResponse(keyPayload)) - .doOnError(Throwable::printStackTrace) - .doOnNext(s -> { - long diff = System.nanoTime() - start; - histogram.recordValue(diff); - }); - }, 16) - .doOnError(Throwable::printStackTrace) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - - } - - @Override - public void onError(Throwable e) { - e.printStackTrace(); - } - - @Override - public void onNext(Payload payload) { - latch.countDown(); - } - }); - - latch.await(1, TimeUnit.HOURS); - System.out.println("Sent => " + n); - System.exit(0); - } -} diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java deleted file mode 100644 index 58cd04f39..000000000 --- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Pong.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.transport.tcp; - -import io.netty.util.internal.ThreadLocalRandom; -import io.reactivesocket.Payload; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; -import rx.Observable; -import rx.RxReactiveStreams; - -import java.nio.ByteBuffer; - -public final class Pong { - - public static void main(String... args) throws Exception { - byte[] response = new byte[1024]; - ThreadLocalRandom.current().nextBytes(response); - - TcpReactiveSocketServer.create(7878) - .start((setupPayload, reactiveSocket) -> { - return new RequestHandler.Builder() - .withRequestResponse(payload -> { - Payload responsePayload = new Payload() { - ByteBuffer data = ByteBuffer.wrap(response); - ByteBuffer metadata = ByteBuffer.allocate(0); - - @Override - public ByteBuffer getData() { - return data; - } - - @Override - public ByteBuffer getMetadata() { - return metadata; - } - }; - return RxReactiveStreams.toPublisher(Observable.just(responsePayload)); - }) - .build(); - }).awaitShutdown(); - } -} diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpClientSetupRule.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpClientSetupRule.java new file mode 100644 index 000000000..4e261533f --- /dev/null +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpClientSetupRule.java @@ -0,0 +1,45 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.transport.tcp; + +import io.reactivesocket.ConnectionSetupHandler; +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.ReactiveSocket; +import io.reactivesocket.RequestHandler; +import io.reactivesocket.test.ClientSetupRule; +import io.reactivesocket.test.TestRequestHandler; +import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; + +import static io.netty.handler.logging.LogLevel.*; + +public class TcpClientSetupRule extends ClientSetupRule { + + public TcpClientSetupRule() { + super(TcpReactiveSocketConnector.create(ConnectionSetupPayload.create("", ""), Throwable::printStackTrace) + .configureClient(client -> client.enableWireLogging("test-client", + DEBUG)), + () -> { + return TcpReactiveSocketServer.create(0) + .configureServer(server -> server.enableWireLogging("test-server", DEBUG)) + .start(new ConnectionSetupHandler() { + @Override + public RequestHandler apply(ConnectionSetupPayload s, ReactiveSocket rs) { + return new TestRequestHandler(); + } + }).getServerAddress(); + }); + } + +} diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPing.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPing.java new file mode 100644 index 000000000..0eb07e4ed --- /dev/null +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPing.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.transport.tcp; + +import io.reactivesocket.ConnectionSetupPayload; +import io.reactivesocket.test.PingClient; +import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector; +import org.HdrHistogram.Recorder; + +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; + +public final class TcpPing { + + public static void main(String... args) throws Exception { + ConnectionSetupPayload payload = ConnectionSetupPayload.create("", ""); + TcpReactiveSocketConnector connector = TcpReactiveSocketConnector.create(payload, Throwable::printStackTrace); + PingClient pingClient = new PingClient(connector); + Recorder recorder = pingClient.startTracker(1, TimeUnit.SECONDS); + final int count = 1_000_000; + pingClient.connect(new InetSocketAddress("localhost", 7878)) + .startPingPong(count, recorder) + .doOnTerminate(() -> { + System.out.println("Sent " + count + " messages."); + }) + .toBlocking() + .last(); + } +} diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPongServer.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPongServer.java new file mode 100644 index 000000000..7ed25b2ca --- /dev/null +++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPongServer.java @@ -0,0 +1,25 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.reactivesocket.transport.tcp; + +import io.reactivesocket.test.PingHandler; +import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer; + +public final class TcpPongServer { + + public static void main(String... args) throws Exception { + TcpReactiveSocketServer.create(7878) + .start(new PingHandler()) + .awaitShutdown(); + } +}