Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,24 @@
import org.reactivestreams.Subscription;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

public class TestingReactiveSocket implements ReactiveSocket {
private final Function<Payload, Payload> responder;

private final AtomicInteger count;
private final BiFunction<Subscriber<? super Payload>, Payload, Boolean> eachPayloadHandler;

public TestingReactiveSocket(Function<Payload, Payload> responder) {
this.responder = responder;
this((subscriber, payload) -> {
subscriber.onNext(responder.apply(payload));
return true;
});
}

public TestingReactiveSocket(BiFunction<Subscriber<? super Payload>, Payload, Boolean> eachPayloadHandler) {
this.eachPayloadHandler = eachPayloadHandler;
this.count = new AtomicInteger(0);
}

Expand All @@ -37,7 +46,7 @@ public Publisher<Void> fireAndForget(Payload payload) {
public Publisher<Payload> requestResponse(Payload payload) {
return subscriber ->
subscriber.onSubscribe(new Subscription() {
boolean cancelled = false;
boolean cancelled;

@Override
public void request(long n) {
Expand All @@ -46,9 +55,9 @@ public void request(long n) {
}
try {
count.incrementAndGet();
Payload response = responder.apply(payload);
subscriber.onNext(response);
subscriber.onComplete();
if (eachPayloadHandler.apply(subscriber, payload)) {
subscriber.onComplete();
}
} catch (Throwable t) {
subscriber.onError(t);
}
Expand Down Expand Up @@ -80,8 +89,7 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(Payload input) {
Payload response = responder.apply(input);
subscriber.onNext(response);
eachPayloadHandler.apply(subscriber, input);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.reactivesocket.Payload;
import io.reactivesocket.exceptions.TimeoutException;
import io.reactivesocket.client.filter.TimeoutSocket;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Subscriber;
Expand All @@ -24,17 +25,12 @@
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.*;

public class TimeoutFactoryTest {
@Test
public void testTimeoutSocket() {
TestingReactiveSocket socket = new TestingReactiveSocket(payload -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return payload;
});
TestingReactiveSocket socket = new TestingReactiveSocket((subscriber, payload) -> {return false;});
TimeoutSocket timeout = new TimeoutSocket(socket, 50, TimeUnit.MILLISECONDS);

timeout.requestResponse(new Payload() {
Expand All @@ -55,17 +51,17 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(Payload payload) {
Assert.assertTrue(false);
throw new AssertionError("onNext invoked when not expected.");
}

@Override
public void onError(Throwable t) {
Assert.assertTrue(t instanceof TimeoutException);
MatcherAssert.assertThat("Unexpected exception in onError", t, instanceOf(TimeoutException.class));
}

@Override
public void onComplete() {
Assert.assertTrue(false);
throw new AssertionError("onComplete invoked when not expected.");
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,10 @@
package io.reactivesocket.internal;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

abstract class CancellableSubscriber<T> implements Subscriber<T> {
public interface CancellableSubscriber<T> extends Subscriber<T> {

private Subscription s;
private boolean cancelled;
void cancel();

@Override
public void onSubscribe(Subscription s) {
boolean _cancel = false;
synchronized (this) {
this.s = s;
if (cancelled) {
_cancel = true;
}
}

if (_cancel) {
_unsafeCancel();
}
}

public void cancel() {
boolean _cancel = false;
synchronized (this) {
cancelled = true;
if (s != null) {
_cancel = true;
}
}

if (_cancel) {
_unsafeCancel();
}
}

protected void doAfterCancel() {
// NoOp by default.
}

private void _unsafeCancel() {
s.cancel();
doAfterCancel();
}
boolean isCancelled();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.internal;

import org.reactivestreams.Subscription;

import java.util.function.Consumer;

final class CancellableSubscriberImpl<T> implements CancellableSubscriber<T> {

static final Consumer<Subscription> EMPTY_ON_SUBSCRIBE = new Consumer<Subscription>() {
@Override
public void accept(Subscription subscription) {
// No Op; empty
}
};

static final Consumer<Throwable> EMPTY_ON_ERROR = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
// No Op; empty
}
};

static final Runnable EMPTY_RUNNABLE = new Runnable() {
@Override
public void run() {
// No Op; empty
}
};

private final Runnable onCancel;
private final Consumer<T> doOnNext;
private final Consumer<Throwable> doOnError;
private final Runnable doOnComplete;
private final Consumer<Subscription> doOnSubscribe;
private Subscription s;
private boolean done;
private boolean cancelled;
private boolean subscribed;

public CancellableSubscriberImpl(Consumer<Subscription> doOnSubscribe, Runnable doOnCancel, Consumer<T> doOnNext,
Consumer<Throwable> doOnError, Runnable doOnComplete) {
this.doOnSubscribe = doOnSubscribe;
onCancel = doOnCancel;
this.doOnNext = doOnNext;
this.doOnError = doOnError;
this.doOnComplete = doOnComplete;
}

public CancellableSubscriberImpl() {
this(EMPTY_ON_SUBSCRIBE, EMPTY_RUNNABLE, t -> {}, EMPTY_ON_ERROR, EMPTY_RUNNABLE);
}

@Override
public void onSubscribe(Subscription s) {

boolean _cancel = false;
synchronized (this) {
if (!subscribed) {
subscribed = true;
this.s = s;
if (cancelled) {
_cancel = true;
}
} else {
_cancel = true;
}
}

if (_cancel) {
s.cancel();
} else {
doOnSubscribe.accept(s);
}
}

@Override
public void cancel() {
boolean _cancel = false;
synchronized (this) {
if (s != null && !cancelled) {
_cancel = true;
}
cancelled = true;
done = true;
}

if (_cancel) {
_unsafeCancel();
}
}

@Override
public synchronized boolean isCancelled() {
return cancelled;
}

@Override
public void onNext(T t) {
if (canEmit()) {
doOnNext.accept(t);
}
}

@Override
public void onError(Throwable t) {
if (!terminate()) {
doOnError.accept(t);
}
}

@Override
public void onComplete() {
if (!terminate()) {
doOnComplete.run();
}
}

static <T> Consumer<T> emptyOnNext() {
return t -> {};
}

private synchronized boolean terminate() {
boolean oldDone = done;
done = true;
return oldDone;
}

private synchronized boolean canEmit() {
return !done;
}

private void _unsafeCancel() {
s.cancel();
onCancel.run();
}
}
Loading