diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/TestingReactiveSocket.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/TestingReactiveSocket.java index 6edd1f6fc..3847ecb58 100644 --- a/reactivesocket-client/src/test/java/io/reactivesocket/client/TestingReactiveSocket.java +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/TestingReactiveSocket.java @@ -9,15 +9,24 @@ import org.reactivestreams.Subscription; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; public class TestingReactiveSocket implements ReactiveSocket { - private final Function responder; + private final AtomicInteger count; + private final BiFunction, Payload, Boolean> eachPayloadHandler; public TestingReactiveSocket(Function responder) { - this.responder = responder; + this((subscriber, payload) -> { + subscriber.onNext(responder.apply(payload)); + return true; + }); + } + + public TestingReactiveSocket(BiFunction, Payload, Boolean> eachPayloadHandler) { + this.eachPayloadHandler = eachPayloadHandler; this.count = new AtomicInteger(0); } @@ -37,7 +46,7 @@ public Publisher fireAndForget(Payload payload) { public Publisher requestResponse(Payload payload) { return subscriber -> subscriber.onSubscribe(new Subscription() { - boolean cancelled = false; + boolean cancelled; @Override public void request(long n) { @@ -46,9 +55,9 @@ public void request(long n) { } try { count.incrementAndGet(); - Payload response = responder.apply(payload); - subscriber.onNext(response); - subscriber.onComplete(); + if (eachPayloadHandler.apply(subscriber, payload)) { + subscriber.onComplete(); + } } catch (Throwable t) { subscriber.onError(t); } @@ -80,8 +89,7 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Payload input) { - Payload response = responder.apply(input); - subscriber.onNext(response); + eachPayloadHandler.apply(subscriber, input); } @Override diff --git a/reactivesocket-client/src/test/java/io/reactivesocket/client/TimeoutFactoryTest.java b/reactivesocket-client/src/test/java/io/reactivesocket/client/TimeoutFactoryTest.java index cf8f98fb8..9cb1bf5cd 100644 --- a/reactivesocket-client/src/test/java/io/reactivesocket/client/TimeoutFactoryTest.java +++ b/reactivesocket-client/src/test/java/io/reactivesocket/client/TimeoutFactoryTest.java @@ -16,6 +16,7 @@ import io.reactivesocket.Payload; import io.reactivesocket.exceptions.TimeoutException; import io.reactivesocket.client.filter.TimeoutSocket; +import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; import org.reactivestreams.Subscriber; @@ -24,17 +25,12 @@ import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.*; + public class TimeoutFactoryTest { @Test public void testTimeoutSocket() { - TestingReactiveSocket socket = new TestingReactiveSocket(payload -> { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return payload; - }); + TestingReactiveSocket socket = new TestingReactiveSocket((subscriber, payload) -> {return false;}); TimeoutSocket timeout = new TimeoutSocket(socket, 50, TimeUnit.MILLISECONDS); timeout.requestResponse(new Payload() { @@ -55,17 +51,17 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Payload payload) { - Assert.assertTrue(false); + throw new AssertionError("onNext invoked when not expected."); } @Override public void onError(Throwable t) { - Assert.assertTrue(t instanceof TimeoutException); + MatcherAssert.assertThat("Unexpected exception in onError", t, instanceOf(TimeoutException.class)); } @Override public void onComplete() { - Assert.assertTrue(false); + throw new AssertionError("onComplete invoked when not expected."); } }); } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriber.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriber.java index af01a7fdb..04da03e83 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriber.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriber.java @@ -14,48 +14,10 @@ package io.reactivesocket.internal; import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -abstract class CancellableSubscriber implements Subscriber { +public interface CancellableSubscriber extends Subscriber { - private Subscription s; - private boolean cancelled; + void cancel(); - @Override - public void onSubscribe(Subscription s) { - boolean _cancel = false; - synchronized (this) { - this.s = s; - if (cancelled) { - _cancel = true; - } - } - - if (_cancel) { - _unsafeCancel(); - } - } - - public void cancel() { - boolean _cancel = false; - synchronized (this) { - cancelled = true; - if (s != null) { - _cancel = true; - } - } - - if (_cancel) { - _unsafeCancel(); - } - } - - protected void doAfterCancel() { - // NoOp by default. - } - - private void _unsafeCancel() { - s.cancel(); - doAfterCancel(); - } + boolean isCancelled(); } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriberImpl.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriberImpl.java new file mode 100644 index 000000000..e268b332f --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriberImpl.java @@ -0,0 +1,149 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import org.reactivestreams.Subscription; + +import java.util.function.Consumer; + +final class CancellableSubscriberImpl implements CancellableSubscriber { + + static final Consumer EMPTY_ON_SUBSCRIBE = new Consumer() { + @Override + public void accept(Subscription subscription) { + // No Op; empty + } + }; + + static final Consumer EMPTY_ON_ERROR = new Consumer() { + @Override + public void accept(Throwable throwable) { + // No Op; empty + } + }; + + static final Runnable EMPTY_RUNNABLE = new Runnable() { + @Override + public void run() { + // No Op; empty + } + }; + + private final Runnable onCancel; + private final Consumer doOnNext; + private final Consumer doOnError; + private final Runnable doOnComplete; + private final Consumer doOnSubscribe; + private Subscription s; + private boolean done; + private boolean cancelled; + private boolean subscribed; + + public CancellableSubscriberImpl(Consumer doOnSubscribe, Runnable doOnCancel, Consumer doOnNext, + Consumer doOnError, Runnable doOnComplete) { + this.doOnSubscribe = doOnSubscribe; + onCancel = doOnCancel; + this.doOnNext = doOnNext; + this.doOnError = doOnError; + this.doOnComplete = doOnComplete; + } + + public CancellableSubscriberImpl() { + this(EMPTY_ON_SUBSCRIBE, EMPTY_RUNNABLE, t -> {}, EMPTY_ON_ERROR, EMPTY_RUNNABLE); + } + + @Override + public void onSubscribe(Subscription s) { + + boolean _cancel = false; + synchronized (this) { + if (!subscribed) { + subscribed = true; + this.s = s; + if (cancelled) { + _cancel = true; + } + } else { + _cancel = true; + } + } + + if (_cancel) { + s.cancel(); + } else { + doOnSubscribe.accept(s); + } + } + + @Override + public void cancel() { + boolean _cancel = false; + synchronized (this) { + if (s != null && !cancelled) { + _cancel = true; + } + cancelled = true; + done = true; + } + + if (_cancel) { + _unsafeCancel(); + } + } + + @Override + public synchronized boolean isCancelled() { + return cancelled; + } + + @Override + public void onNext(T t) { + if (canEmit()) { + doOnNext.accept(t); + } + } + + @Override + public void onError(Throwable t) { + if (!terminate()) { + doOnError.accept(t); + } + } + + @Override + public void onComplete() { + if (!terminate()) { + doOnComplete.run(); + } + } + + static Consumer emptyOnNext() { + return t -> {}; + } + + private synchronized boolean terminate() { + boolean oldDone = done; + done = true; + return oldDone; + } + + private synchronized boolean canEmit() { + return !done; + } + + private void _unsafeCancel() { + s.cancel(); + onCancel.run(); + } +} diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java index 1ba78eeca..e1f3a5aa4 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java @@ -20,9 +20,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; +import static io.reactivesocket.internal.CancellableSubscriberImpl.*; + /** * A set of utility functions for applying function composition over {@link Publisher}s. */ @@ -49,7 +52,14 @@ private Publishers() { */ public static Publisher map(Publisher source, Function map) { return subscriber -> { - source.subscribe(new MapSubscriber<>(subscriber, map)); + source.subscribe(Subscribers.create(subscription -> { + subscriber.onSubscribe(subscription); + }, t -> { + R r = map.apply(t); + subscriber.onNext(r); + }, throwable -> { + subscriber.onError(throwable); + }, () -> subscriber.onComplete(), EMPTY_RUNNABLE)); }; } @@ -66,52 +76,29 @@ public static Publisher map(Publisher source, Function map) { */ public static Publisher timeout(Publisher source, Publisher timeoutSignal) { return s -> { - source.subscribe(new SafeCancellableSubscriberProxy(s) { - - private Runnable timeoutCancellation; - private boolean emitted; - - @Override - public void onSubscribe(Subscription s) { - timeoutCancellation = afterTerminate(timeoutSignal, () -> { - boolean _cancel = true; - synchronized (this) { - _cancel = !emitted; - } - if (_cancel) { - onError(TIMEOUT_EXCEPTION); - cancel(); - } + final AtomicReference timeoutCancellation = new AtomicReference<>(); + CancellableSubscriber sub = Subscribers.create( + subscription -> { + timeoutCancellation.set(afterTerminate(timeoutSignal, () -> { + s.onError(TIMEOUT_EXCEPTION); + })); + s.onSubscribe(subscription); + }, + t -> { + timeoutCancellation.get().run(); + s.onNext(t); + }, + throwable -> { + timeoutCancellation.get().run(); + s.onError(throwable); + }, + () -> { + timeoutCancellation.get().run(); + s.onComplete(); + }, () -> { + timeoutCancellation.get().run(); }); - super.onSubscribe(s); - } - - @Override - protected void doOnNext(T t) { - synchronized (this) { - emitted = true; - } - timeoutCancellation.run(); // Cancel the timeout since we have received one item. - super.doOnNext(t); - } - - @Override - protected void doOnError(Throwable t) { - timeoutCancellation.run(); - super.doOnError(t); - } - - @Override - protected void doOnComplete() { - timeoutCancellation.run(); - super.doOnComplete(); - } - - @Override - protected void doAfterCancel() { - timeoutCancellation.run(); - } - }); + source.subscribe(sub); }; } @@ -142,36 +129,26 @@ public static Publisher timer(ScheduledExecutorService scheduler, long int */ public static Publisher concatEmpty(Publisher first, Publisher second) { return subscriber -> { - first.subscribe(new SafeCancellableSubscriberProxy(subscriber) { - @Override - protected void doOnComplete() { - second.subscribe(new SafeCancellableSubscriber() { - @Override - public void onSubscribe(Subscription s) { - super.onSubscribe(s); - // This is the second subscription which isn't driven by downstream subscriber. - // So, no onSubscriber callback will be coming here (alread done for first subscriber). - // As we are only dealing with empty (Void) sources, this doesn't break backpressure. - s.request(1); - } - - @Override - protected void doOnNext(Void aVoid) { - subscriber.onNext(aVoid); - } - - @Override - protected void doOnError(Throwable t) { - subscriber.onError(t); - } - - @Override - protected void doOnComplete() { - subscriber.onComplete(); - } - }); - } - }); + first.subscribe(Subscribers.create(subscription -> { + subscriber.onSubscribe(subscription); + }, t -> { + subscriber.onNext(t); + }, throwable -> { + subscriber.onError(throwable); + }, () -> { + second.subscribe(Subscribers.create(subscription -> { + // This is the second subscription which isn't driven by downstream subscriber. + // So, no onSubscriber callback will be coming here (alread done for first subscriber). + // As we are only dealing with empty (Void) sources, this doesn't break backpressure. + subscription.request(1); + }, t -> { + subscriber.onNext(t); + }, throwable -> { + subscriber.onError(throwable); + }, () -> { + subscriber.onComplete(); + }, EMPTY_RUNNABLE)); + }, EMPTY_RUNNABLE)); }; } @@ -222,65 +199,33 @@ public static Publisher empty() { * @return Cancellation handle. */ public static Runnable afterTerminate(Publisher source, Runnable action) { - final CancellableSubscriber subscriber = new SafeCancellableSubscriber() { - @Override - public void doOnError(Throwable t) { - action.run(); - } - - @Override - public void doOnComplete() { - action.run(); - } - }; + final CancellableSubscriber subscriber = Subscribers.doOnTerminate(throwable -> action.run(), + () -> action.run()); source.subscribe(subscriber); return () -> subscriber.cancel(); } - private static class MapSubscriber extends SafeCancellableSubscriber { - private final Subscriber subscriber; - private final Function map; - - public MapSubscriber(Subscriber subscriber, Function map) { - this.subscriber = subscriber; - this.map = map; - } - - @Override - public void onSubscribe(Subscription s) { - Subscription s1 = new Subscription() { - @Override - public void request(long n) { - s.request(n); - } + private static final class TimeoutHolder implements Consumer, Runnable { - @Override - public void cancel() { - MapSubscriber.this.cancel(); - } - }; - super.onSubscribe(s1); - subscriber.onSubscribe(s1); - } + private final Publisher timeoutSignal; + private final Subscriber subscriber; + private Runnable timeoutCancellation; - @Override - protected void doOnNext(T t) { - try { - R r = map.apply(t); - subscriber.onNext(r); - } catch (Exception e) { - onError(e); - } + private TimeoutHolder(Publisher timeoutSignal, Subscriber subscriber) { + this.timeoutSignal = timeoutSignal; + this.subscriber = subscriber; } @Override - protected void doOnError(Throwable t) { - subscriber.onError(t); + public void run() { + timeoutCancellation.run(); } @Override - protected void doOnComplete() { - subscriber.onComplete(); + public void accept(Subscription subscription) { + timeoutCancellation = afterTerminate(timeoutSignal, () -> { + subscriber.onError(TIMEOUT_EXCEPTION); + }); } } } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriber.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriber.java deleted file mode 100644 index 6b89d9150..000000000 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriber.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2016 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.reactivesocket.internal; - -import org.reactivestreams.Subscription; - -import java.util.concurrent.atomic.AtomicBoolean; - -abstract class SafeCancellableSubscriber extends CancellableSubscriber { - - protected final AtomicBoolean subscribed = new AtomicBoolean(); - protected final AtomicBoolean done = new AtomicBoolean(); - - @Override - public void onSubscribe(Subscription s) { - if (subscribed.compareAndSet(false, true)) { - super.onSubscribe(s); - } else { - onError(new IllegalStateException("Duplicate subscription.")); - } - } - - @Override - public void onNext(T t) { - if (!done.get()) { - doOnNext(t); - } - } - - @Override - public void onError(Throwable t) { - if (done.compareAndSet(false, true)) { - doOnError(t); - super.cancel(); - } - } - - @Override - public void onComplete() { - if (done.compareAndSet(false, true)) { - doOnComplete(); - } - } - - @Override - public void cancel() { - if (done.compareAndSet(false, true)) { - super.cancel(); - } - } - - protected void doOnNext(T t) { - // NoOp by default - } - - protected void doOnError(Throwable t) { - // NoOp by default - } - - protected void doOnComplete() { - // NoOp by default - } -} diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriberProxy.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriberProxy.java deleted file mode 100644 index 357f4ab62..000000000 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriberProxy.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2016 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.reactivesocket.internal; - -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -abstract class SafeCancellableSubscriberProxy extends SafeCancellableSubscriber { - - private final Subscriber delegate; - - protected SafeCancellableSubscriberProxy(Subscriber delegate) { - this.delegate = delegate; - } - - @Override - public void onSubscribe(final Subscription s) { - Subscription s1 = new Subscription() { - @Override - public void request(long n) { - s.request(n); - } - - @Override - public void cancel() { - SafeCancellableSubscriberProxy.this.cancel(); - } - }; - super.onSubscribe(s1); - delegate.onSubscribe(s1); - } - - @Override - protected void doOnNext(T t) { - delegate.onNext(t); - } - - @Override - protected void doOnError(Throwable t) { - delegate.onError(t); - } - - @Override - protected void doOnComplete() { - delegate.onComplete(); - } -} diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Subscribers.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Subscribers.java new file mode 100644 index 000000000..2c7ade903 --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Subscribers.java @@ -0,0 +1,201 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.function.Consumer; + +import static io.reactivesocket.internal.CancellableSubscriberImpl.*; + +/** + * A factory to create instances of {@link Subscriber} that follow reactive stream specifications. + */ +public final class Subscribers { + + private Subscribers() { + } + + /** + * Creates a new {@code Subscriber} instance that ignores all invocations but follows reactive streams specfication. + * + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber empty() { + return new CancellableSubscriberImpl(); + } + + /** + * Creates a new {@code Subscriber} instance that ignores all invocations other than + * {@link Subscriber#onSubscribe(Subscription)} but follows reactive streams specfication. + * + * @param doOnSubscribe Callback for {@link Subscriber#onSubscribe(Subscription)} + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber doOnSubscribe(Consumer doOnSubscribe) { + return new CancellableSubscriberImpl(doOnSubscribe, EMPTY_RUNNABLE, emptyOnNext(), EMPTY_ON_ERROR, + EMPTY_RUNNABLE); + } + + /** + * Creates a new {@code Subscriber} instance that ignores all invocations other than + * {@link CancellableSubscriber#cancel()} but follows reactive streams specfication. + * + * @param doOnCancel Callback for {@link CancellableSubscriber#cancel()} + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber doOnCancel(Runnable doOnCancel) { + return new CancellableSubscriberImpl(EMPTY_ON_SUBSCRIBE, doOnCancel, emptyOnNext(), EMPTY_ON_ERROR, + EMPTY_RUNNABLE); + } + + /** + * Creates a new {@code Subscriber} instance that ignores all invocations other than + * {@link Subscriber#onSubscribe(Subscription)} and {@link CancellableSubscriber#cancel()} but follows reactive + * streams specfication. + * + * @param doOnSubscribe Callback for {@link Subscriber#onSubscribe(Subscription)} + * @param doOnCancel Callback for {@link CancellableSubscriber#cancel()} + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber create(Consumer doOnSubscribe, Runnable doOnCancel) { + return new CancellableSubscriberImpl(doOnSubscribe, doOnCancel, emptyOnNext(), EMPTY_ON_ERROR, + EMPTY_RUNNABLE); + } + + /** + * Creates a new {@code Subscriber} instance that listens to callbacks for all methods and follows reactive streams + * specfication. + * + * @param doOnSubscribe Callback for {@link Subscriber#onSubscribe(Subscription)} + * @param doOnNext Callback for {@link Subscriber#onNext(Object)} + * @param doOnError Callback for {@link Subscriber#onError(Throwable)} + * @param doOnComplete Callback for {@link Subscriber#onComplete()} + * @param doOnCancel Callback for {@link CancellableSubscriber#cancel()} + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber create(Consumer doOnSubscribe, Consumer doOnNext, + Consumer doOnError, Runnable doOnComplete, + Runnable doOnCancel) { + return new CancellableSubscriberImpl(doOnSubscribe, doOnCancel, doOnNext, doOnError, doOnComplete); + } + + /** + * Creates a new {@code Subscriber} instance that ignores all invocations other than + * {@link Subscriber#onError(Throwable)} but follows reactive streams specfication. + * + * @param doOnError Callback for {@link Subscriber#onError(Throwable)} + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber doOnError(Consumer doOnError) { + return new CancellableSubscriberImpl(EMPTY_ON_SUBSCRIBE, EMPTY_RUNNABLE, emptyOnNext(), doOnError, + EMPTY_RUNNABLE); + } + + /** + * Creates a new {@code Subscriber} instance that ignores all invocations other than + * {@link Subscriber#onComplete()}, {@link Subscriber#onError(Throwable)} but follows reactive streams specfication. + * + * @param doOnComplete Callback for {@link Subscriber#onComplete()} + * @param doOnError Callback for {@link Subscriber#onError(Throwable)} + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber doOnComplete(Runnable doOnComplete, Consumer doOnError) { + return new CancellableSubscriberImpl(EMPTY_ON_SUBSCRIBE, EMPTY_RUNNABLE, emptyOnNext(), doOnError, + doOnComplete); + } + + /** + * Creates a new {@code Subscriber} instance that ignores all invocations other than + * {@link Subscriber#onNext(Object)} and {@link Subscriber#onError(Throwable)}but follows reactive streams + * specfication. + * + * @param doOnNext Callback for {@link Subscriber#onNext(Object)} + * @param doOnError Callback for {@link Subscriber#onError(Throwable)} + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber doOnNext(Consumer doOnNext, Consumer doOnError) { + return new CancellableSubscriberImpl(EMPTY_ON_SUBSCRIBE, EMPTY_RUNNABLE, doOnNext, doOnError, + EMPTY_RUNNABLE); + } + + /** + * Creates a new {@code Subscriber} instance that ignores all invocations other than + * {@link Subscriber#onSubscribe(Subscription)}, {@link Subscriber#onNext(Object)} and + * {@link Subscriber#onError(Throwable)} but follows reactive streams specfication. + * + * @param doOnSubscribe Callback for {@link Subscriber#onSubscribe(Subscription)} + * @param doOnNext Callback for {@link Subscriber#onNext(Object)} + * @param doOnError Callback for {@link Subscriber#onError(Throwable)} + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber doOnNext(Consumer doOnSubscribe, Consumer doOnNext, + Consumer doOnError) { + return new CancellableSubscriberImpl(doOnSubscribe, EMPTY_RUNNABLE, doOnNext, doOnError, EMPTY_RUNNABLE); + } + + /** + * Creates a new {@code Subscriber} instance that ignores all invocations other than + * {@link Subscriber#onSubscribe(Subscription)}, {@link Subscriber#onNext(Object)}, + * {@link Subscriber#onError(Throwable)} and {@link Subscriber#onComplete()} but follows reactive streams + * specfication. + * + * @param doOnSubscribe Callback for {@link Subscriber#onSubscribe(Subscription)} + * @param doOnNext Callback for {@link Subscriber#onNext(Object)} + * @param doOnError Callback for {@link Subscriber#onError(Throwable)} + * @param doOnComplete Callback for {@link Subscriber#onComplete()} + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber doOnNext(Consumer doOnSubscribe, Consumer doOnNext, + Consumer doOnError, Runnable doOnComplete) { + return new CancellableSubscriberImpl(doOnSubscribe, EMPTY_RUNNABLE, doOnNext, doOnError, doOnComplete); + } + + /** + * Creates a new {@code Subscriber} instance that ignores all invocations other than + * {@link Subscriber#onError(Throwable)} and {@link Subscriber#onComplete()} but follows reactive streams + * specfication. + * + * @param doOnError Callback for {@link Subscriber#onError(Throwable)} + * @param doOnComplete Callback for {@link Subscriber#onComplete()} + * @param Type parameter + * + * @return A new {@code Subscriber} instance. + */ + public static CancellableSubscriber doOnTerminate(Consumer doOnError, Runnable doOnComplete) { + return new CancellableSubscriberImpl(EMPTY_ON_SUBSCRIBE, EMPTY_RUNNABLE, emptyOnNext(), doOnError, + doOnComplete); + } +} diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Subscriptions.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Subscriptions.java new file mode 100644 index 000000000..4a9d2fb1c --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Subscriptions.java @@ -0,0 +1,111 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import org.reactivestreams.Subscription; + +import java.util.function.LongConsumer; + +/** + * A factory for implementations of {@link Subscription} + */ +public final class Subscriptions { + + private static final Subscription EMPTY = new Subscription() { + @Override + public void request(long n) { + // No Op + } + + @Override + public void cancel() { + // No Op + } + }; + + private Subscriptions() { + // No instances. + } + + /** + * Empty {@code Subscription} i.e. it does nothing, all method implementations are no-op. + * + * @return An empty {@code Subscription}. This will be a shared instance. + */ + public static Subscription empty() { + return EMPTY; + } + + /** + * Creates a new {@code Subscription} object that invokes the passed {@code onCancelAction} when the subscription is + * cancelled. This will ignore {@link Subscription#request(long)} calls to the returned {@code Subscription} + * + * @return A new {@code Subscription} instance. + */ + public static Subscription forCancel(Runnable onCancelAction) { + return new Subscription() { + @Override + public void request(long n) { + // Do nothing. + } + + @Override + public void cancel() { + onCancelAction.run(); + } + }; + } + + /** + * Creates a new {@code Subscription} object that invokes the passed {@code requestN} consumer for every call to + * the returned {@link Subscription#request(long)} and ignores {@link Subscription#cancel()} calls to the returned + * {@code Subscription} + * + * @return A new {@code Subscription} instance. + */ + public static Subscription forRequestN(LongConsumer requestN) { + return new Subscription() { + @Override + public void request(long n) { + requestN.accept(n); + } + + @Override + public void cancel() { + // No op + } + }; + } + + /** + * Creates a new {@code Subscription} object that invokes the passed {@code requestN} consumer for every call to + * the returned {@link Subscription#request(long)} and {@code onCancelAction} for every call to the returned + * {@link Subscription#cancel()} + * + * @return A new {@code Subscription} instance. + */ + public static Subscription create(LongConsumer requestN, Runnable onCancelAction) { + return new Subscription() { + @Override + public void request(long n) { + requestN.accept(n); + } + + @Override + public void cancel() { + onCancelAction.run(); + } + }; + } +} diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/SubscriberRule.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/SubscriberRule.java new file mode 100644 index 000000000..578ecfaa3 --- /dev/null +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/SubscriberRule.java @@ -0,0 +1,129 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import io.reactivex.subscribers.TestSubscriber; +import org.hamcrest.MatcherAssert; +import org.junit.rules.ExternalResource; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; +import org.reactivestreams.Subscription; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.is; + +public class SubscriberRule extends ExternalResource { + + private Consumer doOnSubscribe; + private Consumer doOnError; + private Consumer doOnNext; + private Runnable doOnComplete; + private Runnable doOnCancel; + private TestSubscriber testSubscriber; + + private int doOnCancelCount; + private int doOnSubscribeCount; + private int doOnNextCount; + private int doOnErrorCount; + private int doOnCompleteCount; + + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + testSubscriber = new TestSubscriber<>(); + doOnSubscribe = subscription -> { + doOnSubscribeCount++; + testSubscriber.onSubscribe(subscription); + }; + doOnCancel = () -> { + doOnCancelCount++; + }; + doOnNext = str -> { + doOnNextCount++; + testSubscriber.onNext(str); + }; + doOnError = throwable -> { + doOnErrorCount++; + testSubscriber.onError(throwable); + }; + doOnComplete = () -> { + doOnCompleteCount++; + testSubscriber.onComplete(); + }; + base.evaluate(); + } + }; + } + + public CancellableSubscriber subscribe() { + CancellableSubscriber subscriber = + Subscribers.create(doOnSubscribe, doOnNext, doOnError, doOnComplete, doOnCancel); + subscribe(subscriber); + return subscriber; + } + + public AtomicInteger subscribe(CancellableSubscriber subscriber) { + final AtomicInteger subscriptionCancel = new AtomicInteger(); + subscriber.onSubscribe(Subscriptions.forCancel(() -> subscriptionCancel.incrementAndGet())); + return subscriptionCancel; + } + + public void assertOnSubscribe(int count) { + MatcherAssert.assertThat("Unexpected onSubscriber invocation count.", doOnSubscribeCount, is(count)); + } + + public void assertOnCancel(int count) { + MatcherAssert.assertThat("Unexpected onCancel invocation count.", doOnCancelCount, is(count)); + } + + public void assertOnNext(int count) { + MatcherAssert.assertThat("Unexpected onNext invocation count.", doOnNextCount, is(count)); + } + + public void assertOnError(int count) { + MatcherAssert.assertThat("Unexpected onError invocation count.", doOnErrorCount, is(count)); + } + + public void assertOnComplete(int count) { + MatcherAssert.assertThat("Unexpected onComplete invocation count.", doOnCompleteCount, is(count)); + } + + public TestSubscriber getTestSubscriber() { + return testSubscriber; + } + + public Consumer getDoOnSubscribe() { + return doOnSubscribe; + } + + public Consumer getDoOnError() { + return doOnError; + } + + public Consumer getDoOnNext() { + return doOnNext; + } + + public Runnable getDoOnComplete() { + return doOnComplete; + } + + public Runnable getDoOnCancel() { + return doOnCancel; + } +} diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/SubscribersCreateTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/SubscribersCreateTest.java new file mode 100644 index 000000000..eb0eb3301 --- /dev/null +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/SubscribersCreateTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import org.hamcrest.MatcherAssert; +import org.junit.Rule; +import org.junit.Test; + +import static org.hamcrest.Matchers.is; + +public class SubscribersCreateTest { + + @Rule + public final SubscriberRule rule = new SubscriberRule(); + + @Test(timeout = 10000) + public void testOnNext() throws Exception { + CancellableSubscriber subscriber = rule.subscribe(); + subscriber.onNext("Hello"); + rule.assertOnNext(1); + rule.getTestSubscriber().assertValue("Hello"); + } + + @Test(timeout = 10000) + public void testOnError() throws Exception { + CancellableSubscriber subscriber = rule.subscribe(); + subscriber.onNext("Hello"); + rule.assertOnNext(1); + rule.getTestSubscriber().assertValue("Hello"); + + subscriber.onError(new NullPointerException()); + rule.assertOnError(1); + rule.getTestSubscriber().assertError(NullPointerException.class); + } + + @Test(timeout = 10000) + public void testOnComplete() throws Exception { + CancellableSubscriber subscriber = rule.subscribe(); + subscriber.onNext("Hello"); + rule.assertOnNext(1); + rule.getTestSubscriber().assertValue("Hello"); + + subscriber.onComplete(); + rule.assertOnComplete(1); + rule.getTestSubscriber().assertComplete(); + } + + @Test(timeout = 10000) + public void testOnNextAfterComplete() throws Exception { + CancellableSubscriber subscriber = rule.subscribe(); + rule.assertOnSubscribe(1); + subscriber.onNext("Hello"); + rule.assertOnNext(1); + + subscriber.onComplete(); + rule.assertOnComplete(1); + + subscriber.onNext("Hello"); + rule.assertOnNext(1); + } + + @Test(timeout = 10000) + public void testOnNextAfterError() throws Exception { + CancellableSubscriber subscriber = rule.subscribe(); + rule.assertOnSubscribe(1); + subscriber.onNext("Hello"); + rule.assertOnNext(1); + + subscriber.onError(new NullPointerException()); + rule.assertOnError(1); + rule.getTestSubscriber().assertError(NullPointerException.class); + + subscriber.onNext("Hello"); + rule.assertOnNext(1); + } +} diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/SubscribersDoOnSubscriberTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/SubscribersDoOnSubscriberTest.java new file mode 100644 index 000000000..7332ca82f --- /dev/null +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/SubscribersDoOnSubscriberTest.java @@ -0,0 +1,68 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import org.hamcrest.MatcherAssert; +import org.junit.Rule; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.*; + +public class SubscribersDoOnSubscriberTest { + + @Rule + public final SubscriberRule rule = new SubscriberRule(); + + @Test + public void testSubscribe() throws Exception { + CancellableSubscriber subscriber = Subscribers.create(rule.getDoOnSubscribe(), + rule.getDoOnCancel()); + AtomicInteger subscriptionCancelCount = rule.subscribe(subscriber); + rule.assertOnSubscribe(1); + subscriber.cancel(); + rule.assertOnCancel(1); + MatcherAssert.assertThat("Subscription not cancelled.", subscriptionCancelCount.get(), is(1)); + } + + @Test + public void testDuplicateSubscribe() throws Exception { + CancellableSubscriber subscriber = rule.subscribe(); + rule.assertOnSubscribe(1); + + AtomicBoolean secondCancellation = new AtomicBoolean(); + subscriber.onSubscribe(Subscriptions.forCancel(() -> secondCancellation.set(true))); + rule.assertOnSubscribe(1); + MatcherAssert.assertThat("Duplicate subscription not cancelled.", secondCancellation.get(), is(true)); + MatcherAssert.assertThat("Original subscription cancelled.", subscriber.isCancelled(), is(false)); + } + + @Test + public void testDuplicateCancel() throws Exception { + CancellableSubscriber subscriber = Subscribers.create(rule.getDoOnSubscribe(), + rule.getDoOnCancel()); + AtomicInteger subscriptionCancelCount = rule.subscribe(subscriber); + rule.assertOnSubscribe(1); + subscriber.cancel(); + rule.assertOnCancel(1); + MatcherAssert.assertThat("Subscription not cancelled.", subscriptionCancelCount.get(), is(1)); + + subscriber.cancel(); + rule.assertOnCancel(1); + rule.assertOnError(0); + } + +}