Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.util;

import io.reactivesocket.Frame;
import io.reactivesocket.rx.Observer;
import rx.Subscriber;

public class ObserverSubscriber extends Subscriber<Frame> {

private final Observer<Frame> o;

public ObserverSubscriber(Observer<Frame> o) {
this.o = o;
}

@Override
public void onCompleted() {
o.onComplete();
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onNext(Frame frame) {
o.onNext(frame);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.util;

import io.reactivesocket.Frame;
import io.reactivesocket.Payload;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;

/**
* An implementation of {@link Payload}
*/
public class PayloadImpl implements Payload {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This being available is very helpful


private final ByteBuffer data;
private final ByteBuffer metadata;

public PayloadImpl(String data) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest dropping the String constructors as you already have the static factory methods. These 4 constructors make it seem like Payload carries Strings or byte[]+Charset

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yschimke are you referring to the TestUtil static methods? This class doesn't have static factory methods.

These 4 constructors make it seem like Payload carries Strings or byte[]+Charset

hmm.. not sure I agree. It is just different ways of constructing the payload with metadata being optional. Speaking of which, may be we should have a NULL_PAYLOAD constant.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I misread the private static methods for a single String.

this(data, (String) null);
}

public PayloadImpl(String data, String metadata) {
this(fromString(data), fromString(metadata));
}

public PayloadImpl(String data, Charset charset) {
this(fromString(data, charset), fromString(null));
}

public PayloadImpl(String data, Charset dataCharset, String metadata, Charset metaDataCharset) {
this(fromString(data, dataCharset), fromString(metadata, metaDataCharset));
}

public PayloadImpl(byte[] data) {
this(ByteBuffer.wrap(data), Frame.NULL_BYTEBUFFER);
}

public PayloadImpl(byte[] data, byte[] metadata) {
this(ByteBuffer.wrap(data), ByteBuffer.wrap(metadata));
}

public PayloadImpl(ByteBuffer data) {
this(data, Frame.NULL_BYTEBUFFER);
}

public PayloadImpl(ByteBuffer data, ByteBuffer metadata) {
this.data = data;
this.metadata = null == metadata ? Frame.NULL_BYTEBUFFER : metadata;
}

@Override
public ByteBuffer getData() {
return data;
}

@Override
public ByteBuffer getMetadata() {
return metadata;
}

private static ByteBuffer fromString(String data) {
return fromString(data, Charset.defaultCharset());
}

private static ByteBuffer fromString(String data, Charset charset) {
return data == null ? Frame.NULL_BYTEBUFFER : ByteBuffer.wrap(data.getBytes(charset));
}
}
15 changes: 15 additions & 0 deletions reactivesocket-test/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

dependencies {
compile project(':reactivesocket-core')
compile 'junit:junit:4.12'
compile 'org.mockito:mockito-core:1.10.19'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.test;

import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.ReactiveSocketConnector;
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.reactivestreams.Publisher;
import rx.Observable;
import rx.functions.Func0;
import rx.observers.TestSubscriber;

import java.net.SocketAddress;
import java.util.function.Function;

import static io.reactivesocket.test.TestUtil.*;
import static rx.RxReactiveStreams.*;

public class ClientSetupRule extends ExternalResource {

private final ReactiveSocketConnector<SocketAddress> client;
private final Func0<SocketAddress> serverStarter;
private SocketAddress serverAddress;
private ReactiveSocket reactiveSocket;

public ClientSetupRule(ReactiveSocketConnector<SocketAddress> connector, Func0<SocketAddress> serverStarter) {
client = connector;
this.serverStarter = serverStarter;
}

@Override
public Statement apply(final Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
serverAddress = serverStarter.call();
reactiveSocket = toObservable(client.connect(serverAddress)).toSingle().toBlocking().value();

base.evaluate();
}
};
}

public ReactiveSocketConnector<SocketAddress> getClient() {
return client;
}

public SocketAddress getServerAddress() {
return serverAddress;
}

public ReactiveSocket getReactiveSocket() {
return reactiveSocket;
}

public void testRequestResponseN(int count) {
TestSubscriber<String> ts = TestSubscriber.create();
Observable
.range(1, count)
.flatMap(i -> toObservable(getReactiveSocket().requestResponse(utf8EncodedPayload("hello", "metadata")))
.map(payload -> byteToString(payload.getData()))
)
.doOnError(Throwable::printStackTrace)
.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertValueCount(count);
ts.assertNoErrors();
ts.assertCompleted();
}

public void testRequestSubscription() {
_testStream(
socket -> toPublisher(toObservable(socket.requestSubscription(utf8EncodedPayload("hello", "metadata")))
.take(10)));
}

public void testRequestStream() {
_testStream(socket -> socket.requestStream(utf8EncodedPayload("hello", "metadata")));
}

private void _testStream(Function<ReactiveSocket, Publisher<Payload>> invoker) {
TestSubscriber<Payload> ts = TestSubscriber.create();
Publisher<Payload> publisher = invoker.apply(reactiveSocket);
toObservable(publisher).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertValueCount(10);
ts.assertNoErrors();
ts.assertCompleted();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.test;

import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.ReactiveSocketConnector;
import io.reactivesocket.util.PayloadImpl;
import org.HdrHistogram.Recorder;
import rx.Observable;
import rx.RxReactiveStreams;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

public class PingClient {

private final ReactiveSocketConnector<SocketAddress> connector;
private final String request;
private ReactiveSocket reactiveSocket;

public PingClient(ReactiveSocketConnector<SocketAddress> connector) {
this.connector = connector;
request = "hello";
}

public PingClient connect(SocketAddress address) {
if (null == reactiveSocket) {
reactiveSocket = RxReactiveStreams.toObservable(connector.connect(address))
.toSingle()
.toBlocking()
.value();
}
return this;
}

public Recorder startTracker(long interval, TimeUnit timeUnit) {
final Recorder histogram = new Recorder(3600000000000L, 3);
Observable.interval(interval, timeUnit)
.forEach(aLong -> {
System.out.println("---- PING/ PONG HISTO ----");
histogram.getIntervalHistogram()
.outputPercentileDistribution(System.out, 5, 1000.0, false);
System.out.println("---- PING/ PONG HISTO ----");
});
return histogram;
}

public Observable<Payload> startPingPong(int count, final Recorder histogram) {
connect(new InetSocketAddress("localhost", 7878));
return Observable.range(1, count)
.flatMap(i -> {
long start = System.nanoTime();
return RxReactiveStreams.toObservable(reactiveSocket.requestResponse(new PayloadImpl(request)))
.doOnTerminate(() -> {
long diff = System.nanoTime() - start;
histogram.recordValue(diff);
});
}, 16)
.doOnError(Throwable::printStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.test;

import io.reactivesocket.ConnectionSetupHandler;
import io.reactivesocket.ConnectionSetupPayload;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.RequestHandler;
import io.reactivesocket.exceptions.SetupException;
import io.reactivesocket.util.PayloadImpl;
import rx.Observable;
import rx.RxReactiveStreams;

import java.util.concurrent.ThreadLocalRandom;

public class PingHandler implements ConnectionSetupHandler {

private final byte[] pong;

public PingHandler() {
pong = new byte[1024];
ThreadLocalRandom.current().nextBytes(pong);
}

public PingHandler(byte[] pong) {
this.pong = pong;
}

@Override
public RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket reactiveSocket)
throws SetupException {
return new RequestHandler.Builder()
.withRequestResponse(payload -> {
Payload responsePayload = new PayloadImpl(pong);
return RxReactiveStreams.toPublisher(Observable.just(responsePayload));
})
.build();
}
}
Loading