diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/util/ObserverSubscriber.java b/reactivesocket-core/src/main/java/io/reactivesocket/util/ObserverSubscriber.java new file mode 100644 index 000000000..2dd84f95e --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/util/ObserverSubscriber.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016 Netflix, Inc. + *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.util; + +import io.reactivesocket.Frame; +import io.reactivesocket.rx.Observer; +import rx.Subscriber; + +public class ObserverSubscriber extends Subscriber { + + private final Observer o; + + public ObserverSubscriber(Observer o) { + this.o = o; + } + + @Override + public void onCompleted() { + o.onComplete(); + } + + @Override + public void onError(Throwable e) { + o.onError(e); + } + + @Override + public void onNext(Frame frame) { + o.onNext(frame); + } +} diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/util/PayloadImpl.java b/reactivesocket-core/src/main/java/io/reactivesocket/util/PayloadImpl.java new file mode 100644 index 000000000..fbbdd1ed1 --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/util/PayloadImpl.java @@ -0,0 +1,80 @@ +/* + * Copyright 2016 Netflix, Inc. + *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.util; + +import io.reactivesocket.Frame; +import io.reactivesocket.Payload; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +/** + * An implementation of {@link Payload} + */ +public class PayloadImpl implements Payload { + + private final ByteBuffer data; + private final ByteBuffer metadata; + + public PayloadImpl(String data) { + this(data, (String) null); + } + + public PayloadImpl(String data, String metadata) { + this(fromString(data), fromString(metadata)); + } + + public PayloadImpl(String data, Charset charset) { + this(fromString(data, charset), fromString(null)); + } + + public PayloadImpl(String data, Charset dataCharset, String metadata, Charset metaDataCharset) { + this(fromString(data, dataCharset), fromString(metadata, metaDataCharset)); + } + + public PayloadImpl(byte[] data) { + this(ByteBuffer.wrap(data), Frame.NULL_BYTEBUFFER); + } + + public PayloadImpl(byte[] data, byte[] metadata) { + this(ByteBuffer.wrap(data), ByteBuffer.wrap(metadata)); + } + + public PayloadImpl(ByteBuffer data) { + this(data, Frame.NULL_BYTEBUFFER); + } + + public PayloadImpl(ByteBuffer data, ByteBuffer metadata) { + this.data = data; + this.metadata = null == metadata ? Frame.NULL_BYTEBUFFER : metadata; + } + + @Override + public ByteBuffer getData() { + return data; + } + + @Override + public ByteBuffer getMetadata() { + return metadata; + } + + private static ByteBuffer fromString(String data) { + return fromString(data, Charset.defaultCharset()); + } + + private static ByteBuffer fromString(String data, Charset charset) { + return data == null ? Frame.NULL_BYTEBUFFER : ByteBuffer.wrap(data.getBytes(charset)); + } +} diff --git a/reactivesocket-test/build.gradle b/reactivesocket-test/build.gradle index 5eff68dbc..93c323017 100644 --- a/reactivesocket-test/build.gradle +++ b/reactivesocket-test/build.gradle @@ -1,3 +1,18 @@ +/* + * Copyright 2016 Netflix, Inc. + *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + dependencies { compile project(':reactivesocket-core') + compile 'junit:junit:4.12' + compile 'org.mockito:mockito-core:1.10.19' } diff --git a/reactivesocket-test/src/main/java/io/reactivesocket/test/ClientSetupRule.java b/reactivesocket-test/src/main/java/io/reactivesocket/test/ClientSetupRule.java new file mode 100644 index 000000000..013075388 --- /dev/null +++ b/reactivesocket-test/src/main/java/io/reactivesocket/test/ClientSetupRule.java @@ -0,0 +1,105 @@ +/* + * Copyright 2016 Netflix, Inc. + *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivesocket.test;
+
+import io.reactivesocket.Payload;
+import io.reactivesocket.ReactiveSocket;
+import io.reactivesocket.ReactiveSocketConnector;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import org.reactivestreams.Publisher;
+import rx.Observable;
+import rx.functions.Func0;
+import rx.observers.TestSubscriber;
+
+import java.net.SocketAddress;
+import java.util.function.Function;
+
+import static io.reactivesocket.test.TestUtil.*;
+import static rx.RxReactiveStreams.*;
+
+public class ClientSetupRule extends ExternalResource {
+
+ private final ReactiveSocketConnector
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivesocket.test;
+
+import io.reactivesocket.Payload;
+import io.reactivesocket.ReactiveSocket;
+import io.reactivesocket.ReactiveSocketConnector;
+import io.reactivesocket.util.PayloadImpl;
+import org.HdrHistogram.Recorder;
+import rx.Observable;
+import rx.RxReactiveStreams;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
+
+public class PingClient {
+
+ private final ReactiveSocketConnector
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivesocket.test;
+
+import io.reactivesocket.ConnectionSetupHandler;
+import io.reactivesocket.ConnectionSetupPayload;
+import io.reactivesocket.Payload;
+import io.reactivesocket.ReactiveSocket;
+import io.reactivesocket.RequestHandler;
+import io.reactivesocket.exceptions.SetupException;
+import io.reactivesocket.util.PayloadImpl;
+import rx.Observable;
+import rx.RxReactiveStreams;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class PingHandler implements ConnectionSetupHandler {
+
+ private final byte[] pong;
+
+ public PingHandler() {
+ pong = new byte[1024];
+ ThreadLocalRandom.current().nextBytes(pong);
+ }
+
+ public PingHandler(byte[] pong) {
+ this.pong = pong;
+ }
+
+ @Override
+ public RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket reactiveSocket)
+ throws SetupException {
+ return new RequestHandler.Builder()
+ .withRequestResponse(payload -> {
+ Payload responsePayload = new PayloadImpl(pong);
+ return RxReactiveStreams.toPublisher(Observable.just(responsePayload));
+ })
+ .build();
+ }
+}
diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TestRequestHandler.java b/reactivesocket-test/src/main/java/io/reactivesocket/test/TestRequestHandler.java
similarity index 69%
rename from reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TestRequestHandler.java
rename to reactivesocket-test/src/main/java/io/reactivesocket/test/TestRequestHandler.java
index 33491d644..1f254e758 100644
--- a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TestRequestHandler.java
+++ b/reactivesocket-test/src/main/java/io/reactivesocket/test/TestRequestHandler.java
@@ -1,26 +1,21 @@
/*
* Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
-package io.reactivesocket.transport.tcp;
+package io.reactivesocket.test;
import io.reactivesocket.Payload;
import io.reactivesocket.RequestHandler;
import io.reactivesocket.exceptions.UnsupportedSetupException;
-import io.reactivesocket.test.TestUtil;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import rx.Observable;
diff --git a/reactivesocket-transport-local/build.gradle b/reactivesocket-transport-local/build.gradle
index 887584cdc..1c943599d 100644
--- a/reactivesocket-transport-local/build.gradle
+++ b/reactivesocket-transport-local/build.gradle
@@ -1,4 +1,19 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
dependencies {
+ compile project(':reactivesocket-transport-tcp')
compile project(':reactivesocket-core')
+
testCompile project(':reactivesocket-test')
}
diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java
deleted file mode 100644
index bafdd3028..000000000
--- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.reactivesocket.local;
-
-import io.reactivesocket.DuplexConnection;
-import io.reactivesocket.Frame;
-import io.reactivesocket.rx.Completable;
-import io.reactivesocket.rx.Observable;
-import io.reactivesocket.rx.Observer;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.io.IOException;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-class LocalClientDuplexConnection implements DuplexConnection {
- private final String name;
-
- private final CopyOnWriteArrayList
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivesocket.local.client;
+
+import io.netty.channel.local.LocalChannel;
+import io.reactivesocket.ConnectionSetupPayload;
+import io.reactivesocket.Frame;
+import io.reactivesocket.ReactiveSocket;
+import io.reactivesocket.ReactiveSocketConnector;
+import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector;
+import io.reactivex.netty.RxNetty;
+import io.reactivex.netty.protocol.tcp.client.TcpClient;
+import org.reactivestreams.Publisher;
+
+import java.net.SocketAddress;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public class LocalReactiveSocketConnector implements ReactiveSocketConnector
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivesocket.local.server;
+
+import io.netty.channel.local.LocalAddress;
+import io.netty.channel.local.LocalServerChannel;
+import io.reactivesocket.ConnectionSetupHandler;
+import io.reactivesocket.Frame;
+import io.reactivesocket.LeaseGovernor;
+import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer;
+import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer.StartedServer;
+import io.reactivex.netty.RxNetty;
+import io.reactivex.netty.protocol.tcp.server.TcpServer;
+
+import java.util.function.Function;
+
+/**
+ * A server using local jvm sockets.
+ */
+public class LocalReactiveSocketServer {
+
+ private final TcpReactiveSocketServer delegate;
+
+ private LocalReactiveSocketServer(TcpReactiveSocketServer delegate) {
+ this.delegate = delegate;
+ }
+
+ /**
+ * Starts this server and uses the passed {@code setupHandler} to setup accepted connection.
+ *
+ * @param setupHandler Setup handler for connections.
+ *
+ * @return A handle for the started server.
+ */
+ public StartedServer start(ConnectionSetupHandler setupHandler) {
+ return start(setupHandler, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR);
+ }
+
+ /**
+ * Starts this server and uses the passed {@code setupHandler} and {@code leaseGovernor} to setup accepted
+ * connection.
+ *
+ * @param setupHandler Setup handler for connections.
+ * @param leaseGovernor To manage leases.
+ *
+ * @return A handle for the started server.
+ */
+ public StartedServer start(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) {
+ return delegate.start(setupHandler, leaseGovernor);
+ }
+
+ /**
+ * Configures the underlying server using the passed {@code configurator}.
+ *
+ * @param configurator Function to transform the underlying server.
+ *
+ * @return New instance of {@code LocalReactiveSocketServer}.
+ */
+ public LocalReactiveSocketServer configureServer(
+ Function
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.reactivesocket.local;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+public class LocalClientServerTest {
+
+ @Rule
+ public final LocalClientSetupRule setup = new LocalClientSetupRule();
+
+ @Test(timeout = 60000)
+ public void testRequestResponse1() {
+ setup.testRequestResponseN(1);
+ }
+
+ @Test(timeout = 60000)
+ public void testRequestResponse10() {
+ setup.testRequestResponseN(10);
+ }
+
+
+ @Test(timeout = 60000)
+ public void testRequestResponse100() {
+ setup.testRequestResponseN(100);
+ }
+
+ @Test(timeout = 60000)
+ public void testRequestResponse10_000() {
+ setup.testRequestResponseN(10_000);
+ }
+
+ @Test(timeout = 60000)
+ public void testRequestStream() {
+ setup.testRequestStream();
+ }
+
+ @Test(timeout = 60000)
+ public void testRequestSubscription() throws InterruptedException {
+ setup.testRequestSubscription();
+ }
+}
\ No newline at end of file
diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientSetupRule.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientSetupRule.java
new file mode 100644
index 000000000..26ef0a895
--- /dev/null
+++ b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalClientSetupRule.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivesocket.local;
+
+import io.netty.util.internal.ThreadLocalRandom;
+import io.reactivesocket.ConnectionSetupHandler;
+import io.reactivesocket.ConnectionSetupPayload;
+import io.reactivesocket.ReactiveSocket;
+import io.reactivesocket.RequestHandler;
+import io.reactivesocket.local.client.LocalReactiveSocketConnector;
+import io.reactivesocket.local.server.LocalReactiveSocketServer;
+import io.reactivesocket.test.ClientSetupRule;
+import io.reactivesocket.test.TestRequestHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.netty.handler.logging.LogLevel.*;
+
+public class LocalClientSetupRule extends ClientSetupRule {
+
+ private static final Logger logger = LoggerFactory.getLogger(LocalClientSetupRule.class);
+
+ public LocalClientSetupRule() {
+ super(LocalReactiveSocketConnector.create(ConnectionSetupPayload.create("", ""), Throwable::printStackTrace)
+ .configureClient(client -> client.enableWireLogging("test-client", DEBUG)),
+ () -> {
+ String id = "test-" + ThreadLocalRandom.current().nextInt(100);
+ logger.info("Local socket id: " + id);
+
+ return LocalReactiveSocketServer.create(id)
+ .configureServer(server -> server.enableWireLogging(id, DEBUG))
+ .start(new ConnectionSetupHandler() {
+ @Override
+ public RequestHandler apply(ConnectionSetupPayload s, ReactiveSocket rs) {
+ return new TestRequestHandler();
+ }
+ }).getServerAddress();
+ });
+ }
+
+}
diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalPing.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalPing.java
new file mode 100644
index 000000000..83ea466c2
--- /dev/null
+++ b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/LocalPing.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.reactivesocket.local;
+
+import io.netty.channel.local.LocalAddress;
+import io.reactivesocket.ConnectionSetupPayload;
+import io.reactivesocket.local.client.LocalReactiveSocketConnector;
+import io.reactivesocket.local.server.LocalReactiveSocketServer;
+import io.reactivesocket.test.PingClient;
+import io.reactivesocket.test.PingHandler;
+import org.HdrHistogram.Recorder;
+
+import java.util.concurrent.TimeUnit;
+
+public final class LocalPing {
+
+ public static final String SERVER_ID = "local-pong";
+
+ public static void main(String... args) throws Exception {
+
+ LocalReactiveSocketServer.create(SERVER_ID)
+ .start(new PingHandler());
+
+ ConnectionSetupPayload payload = ConnectionSetupPayload.create("", "");
+ LocalReactiveSocketConnector connector = LocalReactiveSocketConnector.create(payload,
+ Throwable::printStackTrace);
+ PingClient pingClient = new PingClient(connector);
+ Recorder recorder = pingClient.startTracker(1, TimeUnit.SECONDS);
+ final int count = 1_000_000;
+ pingClient.connect(new LocalAddress(SERVER_ID))
+ .startPingPong(count, recorder)
+ .doOnTerminate(() -> {
+ System.out.println("Sent " + count + " messages.");
+ })
+ .toBlocking()
+ .last();
+ }
+}
diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java
deleted file mode 100644
index c4872e734..000000000
--- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/ObserverSubscriber.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package io.reactivesocket.transport.tcp;
-
-import io.reactivesocket.Frame;
-import io.reactivesocket.rx.Observer;
-import rx.Subscriber;
-
-public class ObserverSubscriber extends Subscriber {
-
- private final Observer o;
-
- public ObserverSubscriber(Observer o) {
- this.o = o;
- }
-
- @Override
- public void onCompleted() {
- o.onComplete();
- }
-
- @Override
- public void onError(Throwable e) {
- o.onError(e);
- }
-
- @Override
- public void onNext(Frame frame) {
- o.onNext(frame);
- }
-}
diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java
index 53e9cb298..6c6ed20e5 100644
--- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java
+++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/TcpDuplexConnection.java
@@ -1,18 +1,14 @@
/*
* Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package io.reactivesocket.transport.tcp;
@@ -21,6 +17,7 @@
import io.reactivesocket.internal.rx.BooleanDisposable;
import io.reactivesocket.rx.Completable;
import io.reactivesocket.rx.Observable;
+import io.reactivesocket.util.ObserverSubscriber;
import io.reactivex.netty.channel.Connection;
import org.reactivestreams.Publisher;
import rx.RxReactiveStreams;
diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java
index ac647eda2..3c8f4184b 100644
--- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java
+++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpReactiveSocketConnector.java
@@ -1,3 +1,16 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
package io.reactivesocket.transport.tcp.client;
import io.netty.buffer.ByteBuf;
@@ -31,10 +44,10 @@ public class TcpReactiveSocketConnector implements ReactiveSocketConnector
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package io.reactivesocket.transport.tcp.server;
@@ -35,6 +31,7 @@
import rx.Observable;
import java.net.SocketAddress;
+import java.util.function.Function;
public class TcpReactiveSocketServer {
@@ -87,6 +84,18 @@ public void error(Throwable e) {
return new StartedServer();
}
+ /**
+ * Configures the underlying server using the passed {@code configurator}.
+ *
+ * @param configurator Function to transform the underlying server.
+ *
+ * @return New instance of {@code TcpReactiveSocketServer}.
+ */
+ public TcpReactiveSocketServer configureServer(
+ Function
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package io.reactivesocket.transport.tcp;
-import io.reactivesocket.Payload;
-import io.reactivesocket.test.TestUtil;
+import io.reactivesocket.test.ClientSetupRule;
import org.junit.Rule;
import org.junit.Test;
-import rx.Observable;
-import rx.observers.TestSubscriber;
-
-import java.util.concurrent.TimeUnit;
-
-import static io.reactivesocket.test.TestUtil.*;
-import static rx.RxReactiveStreams.*;
public class ClientServerTest {
@Rule
- public final ClientSetupRule setup = new ClientSetupRule();
+ public final ClientSetupRule setup = new TcpClientSetupRule();
@Test(timeout = 60000)
public void testRequestResponse1() {
- requestResponseN(1500, 1);
+ setup.testRequestResponseN(1);
}
@Test(timeout = 60000)
public void testRequestResponse10() {
- requestResponseN(1500, 10);
+ setup.testRequestResponseN(10);
}
@Test(timeout = 60000)
public void testRequestResponse100() {
- requestResponseN(1500, 100);
+ setup.testRequestResponseN(100);
}
@Test(timeout = 60000)
public void testRequestResponse10_000() {
- requestResponseN(60_000, 10_000);
+ setup.testRequestResponseN(10_000);
}
@Test(timeout = 60000)
public void testRequestStream() {
- TestSubscriber
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivesocket.transport.tcp;
+
+import io.reactivesocket.ConnectionSetupHandler;
+import io.reactivesocket.ConnectionSetupPayload;
+import io.reactivesocket.ReactiveSocket;
+import io.reactivesocket.RequestHandler;
+import io.reactivesocket.test.ClientSetupRule;
+import io.reactivesocket.test.TestRequestHandler;
+import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector;
+import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer;
+
+import static io.netty.handler.logging.LogLevel.*;
+
+public class TcpClientSetupRule extends ClientSetupRule {
+
+ public TcpClientSetupRule() {
+ super(TcpReactiveSocketConnector.create(ConnectionSetupPayload.create("", ""), Throwable::printStackTrace)
+ .configureClient(client -> client.enableWireLogging("test-client",
+ DEBUG)),
+ () -> {
+ return TcpReactiveSocketServer.create(0)
+ .configureServer(server -> server.enableWireLogging("test-server", DEBUG))
+ .start(new ConnectionSetupHandler() {
+ @Override
+ public RequestHandler apply(ConnectionSetupPayload s, ReactiveSocket rs) {
+ return new TestRequestHandler();
+ }
+ }).getServerAddress();
+ });
+ }
+
+}
diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPing.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPing.java
new file mode 100644
index 000000000..0eb07e4ed
--- /dev/null
+++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPing.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.reactivesocket.transport.tcp;
+
+import io.reactivesocket.ConnectionSetupPayload;
+import io.reactivesocket.test.PingClient;
+import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector;
+import org.HdrHistogram.Recorder;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+public final class TcpPing {
+
+ public static void main(String... args) throws Exception {
+ ConnectionSetupPayload payload = ConnectionSetupPayload.create("", "");
+ TcpReactiveSocketConnector connector = TcpReactiveSocketConnector.create(payload, Throwable::printStackTrace);
+ PingClient pingClient = new PingClient(connector);
+ Recorder recorder = pingClient.startTracker(1, TimeUnit.SECONDS);
+ final int count = 1_000_000;
+ pingClient.connect(new InetSocketAddress("localhost", 7878))
+ .startPingPong(count, recorder)
+ .doOnTerminate(() -> {
+ System.out.println("Sent " + count + " messages.");
+ })
+ .toBlocking()
+ .last();
+ }
+}
diff --git a/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPongServer.java b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPongServer.java
new file mode 100644
index 000000000..7ed25b2ca
--- /dev/null
+++ b/reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/TcpPongServer.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package io.reactivesocket.transport.tcp;
+
+import io.reactivesocket.test.PingHandler;
+import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer;
+
+public final class TcpPongServer {
+
+ public static void main(String... args) throws Exception {
+ TcpReactiveSocketServer.create(7878)
+ .start(new PingHandler())
+ .awaitShutdown();
+ }
+}