From 31e44c3dc776a7a47e298b5ebaa8c012d09d859d Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Tue, 5 Jul 2016 21:58:35 -0700 Subject: [PATCH] Publishers cleanup and new functions #### Problem `Publishers.retry()` had the following bugs: - Retry subscription was sending duplicate `onSubscribe` callbacks to the downstream subscribers. This is illegal according to spec. - Flow control was not properly managed if the error happens after a few emissions. Since we have removed retry functionality from the `ClientBuilder` there is no reason to maintain this code. `Publishers` was not wiring the downstream subscriber cancellation to the cancellation of the `Subscriber`. This means that a cancel followed by `onNext` would not have stopped the emission, which although not necessary (because of the inherent race between cancel and emissions) but is good to have. #### Modification - Removed `RetrySocket` and `Publishers.retry()` since they are not used. - Added (moved from `PublisherUtils`) `.empty()`, `.just()` and `.error()` to `Publishers` - Added `concatEmpty()` to `Publishers`. This is required in places where we concat `Publisher` in custom crafter code. (I will send another PR which is to use this operator in `ReactiveSocket.close()` - Added tests for functions in `Publishers`. --- .../client/filter/RetrySocket.java | 70 ----- .../io/reactivesocket/RequestHandler.java | 5 +- .../internal/CancellableSubscriber.java | 61 +++++ .../internal/PublisherUtils.java | 63 ----- .../reactivesocket/internal/Publishers.java | 248 +++++++----------- .../io/reactivesocket/internal/Responder.java | 12 +- .../internal/SafeCancellableSubscriber.java | 74 ++++++ .../SafeCancellableSubscriberProxy.java | 58 ++++ .../internal/SingleEmissionSubscription.java | 70 +++++ .../io/reactivesocket/ReactiveSocketPerf.java | 4 +- .../internal/PublishersConcatEmptyTest.java | 90 +++++++ .../internal/PublishersMapTest.java | 108 ++++++++ .../PublishersSingleEmissionsTest.java | 56 ++++ .../internal/PublishersTimeoutTest.java | 81 ++++++ 14 files changed, 709 insertions(+), 291 deletions(-) delete mode 100644 reactivesocket-client/src/main/java/io/reactivesocket/client/filter/RetrySocket.java create mode 100644 reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriber.java create mode 100644 reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriber.java create mode 100644 reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriberProxy.java create mode 100644 reactivesocket-core/src/main/java/io/reactivesocket/internal/SingleEmissionSubscription.java create mode 100644 reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersConcatEmptyTest.java create mode 100644 reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersMapTest.java create mode 100644 reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersSingleEmissionsTest.java create mode 100644 reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersTimeoutTest.java diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/RetrySocket.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/RetrySocket.java deleted file mode 100644 index 013b16035..000000000 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/filter/RetrySocket.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.client.filter; - -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.internal.Publishers; -import io.reactivesocket.util.ReactiveSocketProxy; -import org.reactivestreams.Publisher; - -import java.util.function.Function; - -public class RetrySocket extends ReactiveSocketProxy { - private final int retry; - private final Function retryThisException; - - public RetrySocket(ReactiveSocket child, int retry, Function retryThisException) { - super(child); - this.retry = retry; - this.retryThisException = retryThisException; - } - - @Override - public Publisher fireAndForget(Payload payload) { - return Publishers.retry(child.fireAndForget(payload), retry, retryThisException); - } - - @Override - public Publisher requestResponse(Payload payload) { - return Publishers.retry(child.requestResponse(payload), retry, retryThisException); - } - - @Override - public Publisher requestStream(Payload payload) { - return Publishers.retry(child.requestStream(payload), retry, retryThisException); - } - - @Override - public Publisher requestSubscription(Payload payload) { - return Publishers.retry(child.requestSubscription(payload), retry, retryThisException); - } - - @Override - public Publisher requestChannel(Publisher payload) { - return Publishers.retry(child.requestChannel(payload), retry, retryThisException); - } - - @Override - public Publisher metadataPush(Payload payload) { - return Publishers.retry(child.metadataPush(payload), retry, retryThisException); - } - - @Override - public String toString() { - return "RetrySocket(" + retry + ")->" + child; - } -} diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/RequestHandler.java b/reactivesocket-core/src/main/java/io/reactivesocket/RequestHandler.java index 8e51d90a6..4859bd62b 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/RequestHandler.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/RequestHandler.java @@ -13,6 +13,7 @@ package io.reactivesocket; import io.reactivesocket.internal.PublisherUtils; +import io.reactivesocket.internal.Publishers; import org.reactivestreams.Publisher; import java.util.function.Function; @@ -28,13 +29,13 @@ public abstract class RequestHandler { payload -> PublisherUtils.errorPayload(new RuntimeException("No 'requestSubscription' handler")); private static final Function> NO_FIRE_AND_FORGET_HANDLER = - payload -> PublisherUtils.errorVoid(new RuntimeException("No 'fireAndForget' handler")); + payload -> Publishers.error(new RuntimeException("No 'fireAndForget' handler")); private static final Function, Publisher> NO_REQUEST_CHANNEL_HANDLER = payloads -> PublisherUtils.errorPayload(new RuntimeException("No 'requestChannel' handler")); private static final Function> NO_METADATA_PUSH_HANDLER = - payload -> PublisherUtils.errorVoid(new RuntimeException("No 'metadataPush' handler")); + payload -> Publishers.error(new RuntimeException("No 'metadataPush' handler")); public abstract Publisher handleRequestResponse(final Payload payload); diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriber.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriber.java new file mode 100644 index 000000000..af01a7fdb --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/CancellableSubscriber.java @@ -0,0 +1,61 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +abstract class CancellableSubscriber implements Subscriber { + + private Subscription s; + private boolean cancelled; + + @Override + public void onSubscribe(Subscription s) { + boolean _cancel = false; + synchronized (this) { + this.s = s; + if (cancelled) { + _cancel = true; + } + } + + if (_cancel) { + _unsafeCancel(); + } + } + + public void cancel() { + boolean _cancel = false; + synchronized (this) { + cancelled = true; + if (s != null) { + _cancel = true; + } + } + + if (_cancel) { + _unsafeCancel(); + } + } + + protected void doAfterCancel() { + // NoOp by default. + } + + private void _unsafeCancel() { + s.cancel(); + doAfterCancel(); + } +} diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/PublisherUtils.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/PublisherUtils.java index 6fc699220..77e1543cd 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/PublisherUtils.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/PublisherUtils.java @@ -105,69 +105,6 @@ public void cancel() { }; } - public static final Publisher errorVoid(Throwable e) { - return (Subscriber s) -> { - s.onSubscribe(new Subscription() { - - @Override - public void request(long n) { - } - - @Override - public void cancel() { - // ignoring as nothing to do - } - - }); - s.onError(e); - - }; - } - - public static final Publisher just(Frame frame) { - return (Subscriber s) -> { - s.onSubscribe(new Subscription() { - - boolean completed = false; - - @Override - public void request(long n) { - if (!completed && n > 0) { - completed = true; - s.onNext(frame); - s.onComplete(); - } - } - - @Override - public void cancel() { - // ignoring as nothing to do - } - - }); - - }; - } - - public static final Publisher empty() { - return (Subscriber s) -> { - s.onSubscribe(new Subscription() { - - @Override - public void request(long n) { - } - - @Override - public void cancel() { - // ignoring as nothing to do - } - - }); - s.onComplete(); // TODO confirm this is okay with ReactiveStream spec to send immediately after onSubscribe (I think so since no data is being sent so requestN doesn't matter) - }; - - } - public static final Publisher keepaliveTicker(final int interval, final TimeUnit timeUnit) { return (Subscriber s) -> { s.onSubscribe(new Subscription() diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java index 64de92e84..1ba78eeca 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Publishers.java @@ -20,7 +20,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -50,32 +49,7 @@ private Publishers() { */ public static Publisher map(Publisher source, Function map) { return subscriber -> { - source.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - subscriber.onSubscribe(s); - } - - @Override - public void onNext(T t) { - try { - R r = map.apply(t); - subscriber.onNext(r); - } catch (Exception e) { - onError(e); - } - } - - @Override - public void onError(Throwable t) { - subscriber.onError(t); - } - - @Override - public void onComplete() { - subscriber.onComplete(); - } - }); + source.subscribe(new MapSubscriber<>(subscriber, map)); }; } @@ -157,36 +131,87 @@ public static Publisher timer(ScheduledExecutorService scheduler, long int } /** - * Adds retrying on errors to the passed {@code source}. + * Concats {@code first} source with the {@code second} source. This will subscribe to the {@code second} source + * when the first one completes. Any errors from the {@code first} source will result in not subscribing to the + * {@code second} source * - * @param source Source to add retry to. - * @param retryCount Number of times to retry. - * @param retrySelector Function that determines whether an error is retryable. + * @param first source to subscribe. + * @param second source to subscribe. * - * @param Type of items emitted by the source. - * - * @return A new {@code Publisher} with retry enabled. + * @return New {@code Publisher} which concats both the passed sources. */ - public static Publisher retry(Publisher source, int retryCount, - Function retrySelector) { - return s -> { - source.subscribe(new SafeCancellableSubscriberProxy(s) { - private final AtomicInteger budget = new AtomicInteger(retryCount); + public static Publisher concatEmpty(Publisher first, Publisher second) { + return subscriber -> { + first.subscribe(new SafeCancellableSubscriberProxy(subscriber) { @Override - protected void doOnError(Throwable t) { - if (retrySelector.apply(t) && budget.decrementAndGet() >= 0) { - done.set(false); // Reset done since we subscribe again. - // Since cancellation flag isn't cleared, if the subscription cancelled then this new - // subscription will automatically be cancelled. - source.subscribe(this); - } else { - super.doOnError(t); // Proxy to delegate. - } + protected void doOnComplete() { + second.subscribe(new SafeCancellableSubscriber() { + @Override + public void onSubscribe(Subscription s) { + super.onSubscribe(s); + // This is the second subscription which isn't driven by downstream subscriber. + // So, no onSubscriber callback will be coming here (alread done for first subscriber). + // As we are only dealing with empty (Void) sources, this doesn't break backpressure. + s.request(1); + } + + @Override + protected void doOnNext(Void aVoid) { + subscriber.onNext(aVoid); + } + + @Override + protected void doOnError(Throwable t) { + subscriber.onError(t); + } + + @Override + protected void doOnComplete() { + subscriber.onComplete(); + } + }); } }); }; } + /** + * A new {@code Publisher} that just emits the passed error on subscription. + * + * @param error that the returned source will emit. + * + * @return New {@code Publisher} which emits the passed {@code error}. + */ + public static Publisher error(Throwable error) { + return subscriber -> { + subscriber.onSubscribe(new SingleEmissionSubscription(subscriber, error)); + }; + } + + /** + * A new {@code Publisher} that just emits the passed {@code item} and completes. + * + * @param item that the returned source will emit. + * + * @return New {@code Publisher} which just emits the passed {@code item}. + */ + public static Publisher just(T item) { + return subscriber -> { + subscriber.onSubscribe(new SingleEmissionSubscription(subscriber, item)); + }; + } + + /** + * A new {@code Publisher} that immediately completes without emitting any item. + * + * @return New {@code Publisher} which immediately completes without emitting any item. + */ + public static Publisher empty() { + return subscriber -> { + subscriber.onSubscribe(new SingleEmissionSubscription(subscriber)); + }; + } + /** * Subscribes to the passed source and invokes the {@code action} once after either {@link Subscriber#onComplete()} * or {@link Subscriber#onError(Throwable)} is invoked. @@ -212,123 +237,50 @@ public void doOnComplete() { return () -> subscriber.cancel(); } - private static abstract class SafeCancellableSubscriberProxy extends SafeCancellableSubscriber { - - private final Subscriber delegate; + private static class MapSubscriber extends SafeCancellableSubscriber { + private final Subscriber subscriber; + private final Function map; - protected SafeCancellableSubscriberProxy(Subscriber delegate) { - this.delegate = delegate; + public MapSubscriber(Subscriber subscriber, Function map) { + this.subscriber = subscriber; + this.map = map; } @Override public void onSubscribe(Subscription s) { - super.onSubscribe(s); - delegate.onSubscribe(s); - } - - @Override - protected void doOnNext(T t) { - delegate.onNext(t); - } - - @Override - protected void doOnError(Throwable t) { - delegate.onError(t); - } - - @Override - protected void doOnComplete() { - delegate.onComplete(); - } - } - - private static abstract class SafeCancellableSubscriber extends CancellableSubscriber { - - protected final AtomicBoolean done = new AtomicBoolean(); - - @Override - public void onNext(T t) { - if (!done.get()) { - doOnNext(t); - } - } + Subscription s1 = new Subscription() { + @Override + public void request(long n) { + s.request(n); + } - @Override - public void onError(Throwable t) { - if (done.compareAndSet(false, true)) { - doOnError(t); - super.cancel(); - } + @Override + public void cancel() { + MapSubscriber.this.cancel(); + } + }; + super.onSubscribe(s1); + subscriber.onSubscribe(s1); } @Override - public void onComplete() { - if (done.compareAndSet(false, true)) { - doOnComplete(); + protected void doOnNext(T t) { + try { + R r = map.apply(t); + subscriber.onNext(r); + } catch (Exception e) { + onError(e); } } @Override - public void cancel() { - if (done.compareAndSet(false, true)) { - super.cancel(); - } - } - - protected void doOnNext(T t) { - // NoOp by default - } - protected void doOnError(Throwable t) { - // NoOp by default - } - - protected void doOnComplete() { - // NoOp by default + subscriber.onError(t); } - } - - private static abstract class CancellableSubscriber implements Subscriber { - - private Subscription s; - private boolean cancelled; @Override - public void onSubscribe(Subscription s) { - boolean _cancel = false; - synchronized (this) { - this.s = s; - if (cancelled) { - _cancel = true; - } - } - - if (_cancel) { - _unsafeCancel(); - } - } - - public void cancel() { - boolean _cancel = false; - synchronized (this) { - cancelled = true; - if (s != null) { - _cancel = true; - } - } - - if (_cancel) { - _unsafeCancel(); - } - } - - protected void doAfterCancel() { - // NoOp by default. - } - - private void _unsafeCancel() { - s.cancel(); - doAfterCancel(); + protected void doOnComplete() { + subscriber.onComplete(); } } } diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Responder.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Responder.java index 4698b5a1f..858ada82d 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Responder.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Responder.java @@ -141,7 +141,7 @@ public static Responder createClientResponder( */ public void sendLease(final int ttl, final int numberOfRequests) { Frame leaseFrame = Frame.Lease.from(ttl, numberOfRequests, Frame.NULL_BYTEBUFFER); - connection.addOutput(PublisherUtils.just(leaseFrame), new Completable() { + connection.addOutput(Publishers.just(leaseFrame), new Completable() { @Override public void success() {} @@ -284,7 +284,7 @@ public void onNext(Frame requestFrame) { if (Frame.Keepalive.hasRespondFlag(requestFrame)) { Frame keepAliveFrame = Frame.Keepalive.from( requestFrame.getData(), false); - responsePublisher = PublisherUtils.just(keepAliveFrame); + responsePublisher = Publishers.just(keepAliveFrame); } else { return; } @@ -338,7 +338,7 @@ private void setupErrorAndTearDown( // pass the ErrorFrame output, subscribe to write it, await // onComplete and then tear down final Frame frame = Frame.Error.from(0, setupException); - connection.addOutput(PublisherUtils.just(frame), + connection.addOutput(Publishers.just(frame), new Completable() { @Override public void success() { @@ -660,7 +660,7 @@ private Publisher handleFireAndForget( } // we always treat this as if it immediately completes as we don't want // errors passing back to the user - return PublisherUtils.empty(); + return Publishers.empty(); } private Publisher handleMetadataPush( @@ -676,7 +676,7 @@ private Publisher handleMetadataPush( } // we always treat this as if it immediately completes as we don't want // errors passing back to the user - return PublisherUtils.empty(); + return Publishers.empty(); } /** @@ -844,7 +844,7 @@ private void cleanup() { } // TODO should at least have an error message of some kind if the // Requester disregarded it - return PublisherUtils.empty(); + return Publishers.empty(); } else { // TODO should we use a BufferUntilSubscriber solution instead to // handle time-gap issues like this? diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriber.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriber.java new file mode 100644 index 000000000..6b89d9150 --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriber.java @@ -0,0 +1,74 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import org.reactivestreams.Subscription; + +import java.util.concurrent.atomic.AtomicBoolean; + +abstract class SafeCancellableSubscriber extends CancellableSubscriber { + + protected final AtomicBoolean subscribed = new AtomicBoolean(); + protected final AtomicBoolean done = new AtomicBoolean(); + + @Override + public void onSubscribe(Subscription s) { + if (subscribed.compareAndSet(false, true)) { + super.onSubscribe(s); + } else { + onError(new IllegalStateException("Duplicate subscription.")); + } + } + + @Override + public void onNext(T t) { + if (!done.get()) { + doOnNext(t); + } + } + + @Override + public void onError(Throwable t) { + if (done.compareAndSet(false, true)) { + doOnError(t); + super.cancel(); + } + } + + @Override + public void onComplete() { + if (done.compareAndSet(false, true)) { + doOnComplete(); + } + } + + @Override + public void cancel() { + if (done.compareAndSet(false, true)) { + super.cancel(); + } + } + + protected void doOnNext(T t) { + // NoOp by default + } + + protected void doOnError(Throwable t) { + // NoOp by default + } + + protected void doOnComplete() { + // NoOp by default + } +} diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriberProxy.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriberProxy.java new file mode 100644 index 000000000..357f4ab62 --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/SafeCancellableSubscriberProxy.java @@ -0,0 +1,58 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +abstract class SafeCancellableSubscriberProxy extends SafeCancellableSubscriber { + + private final Subscriber delegate; + + protected SafeCancellableSubscriberProxy(Subscriber delegate) { + this.delegate = delegate; + } + + @Override + public void onSubscribe(final Subscription s) { + Subscription s1 = new Subscription() { + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + SafeCancellableSubscriberProxy.this.cancel(); + } + }; + super.onSubscribe(s1); + delegate.onSubscribe(s1); + } + + @Override + protected void doOnNext(T t) { + delegate.onNext(t); + } + + @Override + protected void doOnError(Throwable t) { + delegate.onError(t); + } + + @Override + protected void doOnComplete() { + delegate.onComplete(); + } +} diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/SingleEmissionSubscription.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/SingleEmissionSubscription.java new file mode 100644 index 000000000..298857ff6 --- /dev/null +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/SingleEmissionSubscription.java @@ -0,0 +1,70 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +class SingleEmissionSubscription implements Subscription { + + private final Subscriber subscriber; + private final Throwable error; + private final T item; + private boolean done; + + public SingleEmissionSubscription(Subscriber subscriber, Throwable error) { + this.subscriber = subscriber; + this.error = error; + item = null; + } + + public SingleEmissionSubscription(Subscriber subscriber, T item) { + this.subscriber = subscriber; + error = null; + this.item = item; + } + + public SingleEmissionSubscription(Subscriber subscriber) { + this.subscriber = subscriber; + error = null; + item = null; + } + + @Override + public void request(long n) { + boolean _emit = false; + synchronized (this) { + if (!done) { + done = true; + _emit = true; + } + } + + if (_emit) { + if (error != null) { + subscriber.onError(error); + } else if (item != null) { + subscriber.onNext(item); + subscriber.onComplete(); + } else { + subscriber.onComplete(); + } + } + } + + @Override + public void cancel() { + // No Op since this is the starting publisher + } +} diff --git a/reactivesocket-core/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java b/reactivesocket-core/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java index fa08a17a9..eda71791d 100644 --- a/reactivesocket-core/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java +++ b/reactivesocket-core/src/perf/java/io/reactivesocket/ReactiveSocketPerf.java @@ -1,6 +1,6 @@ package io.reactivesocket; -import io.reactivesocket.internal.PublisherUtils; +import io.reactivesocket.internal.Publishers; import io.reactivesocket.perfutil.PerfTestConnection; import io.reactivesocket.rx.Completable; import org.openjdk.jmh.annotations.Benchmark; @@ -118,7 +118,7 @@ public Publisher handleSubscription(Payload payload) { @Override public Publisher handleFireAndForget(Payload payload) { - return PublisherUtils.empty(); + return Publishers.empty(); } @Override diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersConcatEmptyTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersConcatEmptyTest.java new file mode 100644 index 000000000..74626c887 --- /dev/null +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersConcatEmptyTest.java @@ -0,0 +1,90 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import io.reactivex.Observable; +import io.reactivex.subscribers.TestSubscriber; +import org.hamcrest.MatcherAssert; +import org.junit.Test; +import org.reactivestreams.Publisher; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class PublishersConcatEmptyTest { + + @Test(timeout = 10000) + public void concatEmpty() throws Exception { + Publisher first = Publishers.empty(); + Publisher second = Publishers.empty(); + + Publisher concat = Publishers.concatEmpty(first, second); + TestSubscriber testSubscriber = new TestSubscriber<>(); + concat.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + } + + @Test(timeout = 10000) + public void concatEmptyFirstError() throws Exception { + NullPointerException npe = new NullPointerException(); + Publisher first = Publishers.error(npe); + AtomicBoolean secondSubscribed = new AtomicBoolean(); + Observable second = Observable.empty().doOnSubscribe(subscription -> secondSubscribed.set(true)); + + Publisher concat = Publishers.concatEmpty(first, second); + TestSubscriber testSubscriber = new TestSubscriber<>(); + concat.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertError(npe); + MatcherAssert.assertThat("Second source was subscribed even though first errored.", !secondSubscribed.get()); + } + + @Test(timeout = 10000) + public void concatEmptySecondError() throws Exception { + NullPointerException npe = new NullPointerException(); + AtomicBoolean firstSubscribed = new AtomicBoolean(); + Observable first = Observable.empty().doOnSubscribe(subscription -> firstSubscribed.set(true)); + AtomicBoolean secondSubscribed = new AtomicBoolean(); + Observable second = Observable.error(npe).doOnSubscribe(subscription -> secondSubscribed.set(true)); + + Publisher concat = Publishers.concatEmpty(first, second); + TestSubscriber testSubscriber = new TestSubscriber<>(); + concat.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertError(npe); + MatcherAssert.assertThat("First source was not subscribed.", firstSubscribed.get()); + MatcherAssert.assertThat("Second source was not subscribed.", secondSubscribed.get()); + } + + @Test(timeout = 10000) + public void concatEmptyVerifySubscribe() throws Exception { + AtomicBoolean firstSubscribed = new AtomicBoolean(); + Observable first = Observable.empty().doOnSubscribe(subscription -> firstSubscribed.set(true)); + AtomicBoolean secondSubscribed = new AtomicBoolean(); + Observable second = Observable.empty().doOnSubscribe(subscription -> secondSubscribed.set(true)); + + Publisher concat = Publishers.concatEmpty(first, second); + TestSubscriber testSubscriber = new TestSubscriber<>(); + concat.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + + MatcherAssert.assertThat("First source was not subscribed.", firstSubscribed.get()); + MatcherAssert.assertThat("Second source was not subscribed.", secondSubscribed.get()); + } +} \ No newline at end of file diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersMapTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersMapTest.java new file mode 100644 index 000000000..2b96e0e3a --- /dev/null +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersMapTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.schedulers.TestScheduler; +import io.reactivex.subscribers.TestSubscriber; +import org.junit.Test; +import org.reactivestreams.Publisher; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class PublishersMapTest { + + @Test(timeout = 10000) + public void mapSameType() throws Exception { + testMap(num -> "Convert: " + num, "Hello1", "Hello2"); + } + + @Test(timeout = 10000) + public void mapConvertType() throws Exception { + testMap(num -> "Convert: " + num, 1, 2); + } + + @Test(timeout = 10000) + public void mapWithError() throws Exception { + NullPointerException npe = new NullPointerException(); + Publisher map = Publishers.map(Publishers.error(npe), o -> o); + TestSubscriber testSubscriber = new TestSubscriber<>(); + map.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertError(npe); + testSubscriber.assertNoValues(); + } + + @Test(timeout = 10000) + public void mapEmpty() throws Exception { + Publisher map = Publishers.map(Publishers.empty(), o -> o); + TestSubscriber testSubscriber = new TestSubscriber<>(); + map.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertNoValues(); + } + + @Test(timeout = 10000) + public void mapWithCancel() throws Exception { + String msg1 = "Hello1"; + String msg2 = "Hello2"; + String prefix = "Converted: "; + TestScheduler testScheduler = Schedulers.test(); + Publisher source = Observable.fromArray(msg1, msg2) + .concatWith(Observable.timer(1, TimeUnit.DAYS, testScheduler) + .map(String::valueOf)); + + Publisher map = Publishers.map(source, s -> prefix + s); + + TestSubscriber testSubscriber = new TestSubscriber<>(); + map.subscribe(testSubscriber); + + testSubscriber.assertNoErrors(); + + testSubscriber.assertValueCount(2); + + testSubscriber.assertValues(prefix + msg1, prefix + msg2); + testSubscriber.assertNotComplete(); + + testSubscriber.cancel(); + + testScheduler.advanceTimeBy(1, TimeUnit.DAYS); + + testSubscriber.assertNotComplete(); + testSubscriber.assertValueCount(2); + } + + @SafeVarargs + private static void testMap(Function mapFunc, T... msgs) { + Publisher source = Observable.fromArray(msgs); + Publisher map = Publishers.map(source, mapFunc); + + TestSubscriber testSubscriber = new TestSubscriber<>(); + map.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + + testSubscriber.assertValueCount(msgs.length); + + testSubscriber.assertValueSequence(Arrays.asList(msgs).stream().map(mapFunc).collect(Collectors.toList())); + } +} \ No newline at end of file diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersSingleEmissionsTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersSingleEmissionsTest.java new file mode 100644 index 000000000..32f32e340 --- /dev/null +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersSingleEmissionsTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import io.reactivex.subscribers.TestSubscriber; +import org.junit.Test; +import org.reactivestreams.Publisher; + +public class PublishersSingleEmissionsTest { + + @Test(timeout = 10000) + public void error() throws Exception { + NullPointerException npe = new NullPointerException(); + Publisher empty = Publishers.error(npe); + TestSubscriber testSubscriber = new TestSubscriber<>(); + empty.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertError(npe); + testSubscriber.assertNoValues(); + } + + @Test(timeout = 10000) + public void just() throws Exception { + String msg = "hello"; + Publisher empty = Publishers.just(msg); + TestSubscriber testSubscriber = new TestSubscriber<>(); + empty.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValue(msg); + } + + @Test(timeout = 10000) + public void empty() throws Exception { + Publisher empty = Publishers.empty(); + TestSubscriber testSubscriber = new TestSubscriber<>(); + empty.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertNoValues(); + } +} \ No newline at end of file diff --git a/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersTimeoutTest.java b/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersTimeoutTest.java new file mode 100644 index 000000000..a3376af5c --- /dev/null +++ b/reactivesocket-core/src/test/java/io/reactivesocket/internal/PublishersTimeoutTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.reactivesocket.internal; + +import io.reactivesocket.exceptions.TimeoutException; +import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.schedulers.TestScheduler; +import io.reactivex.subscribers.TestSubscriber; +import org.junit.Test; +import org.reactivestreams.Publisher; + +import java.util.concurrent.TimeUnit; + +public class PublishersTimeoutTest { + + @Test(timeout = 10000) + public void timeoutNotTriggeredSingleMessage() throws Exception { + String msg = "Hello"; + Publisher source = Publishers.just(msg); + TestScheduler testScheduler = Schedulers.test(); + Publisher timer = Observable.timer(1, TimeUnit.DAYS, testScheduler) + .ignoreElements().cast(Void.class); + Publisher timeout = Publishers.timeout(source, timer); + TestSubscriber testSubscriber = new TestSubscriber<>(); + timeout.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValueCount(1); + testSubscriber.assertValue(msg); + } + + @Test(timeout = 10000) + public void timeoutTriggeredPostFirstMessage() throws Exception { + String msg = "Hello"; + Publisher source = Observable.just(msg) + .concatWith(Observable.never()); + TestScheduler testScheduler = Schedulers.test(); + Publisher timer = Observable.timer(1, TimeUnit.DAYS, testScheduler) + .ignoreElements().cast(Void.class); + Publisher timeout = Publishers.timeout(source, timer); + TestSubscriber testSubscriber = new TestSubscriber<>(); + timeout.subscribe(testSubscriber); + + testSubscriber.assertNoErrors(); + testSubscriber.assertValueCount(1); + testSubscriber.assertValue(msg); + + testScheduler.advanceTimeBy(1, TimeUnit.DAYS); + testSubscriber.assertNotTerminated(); + testSubscriber.assertValueCount(1); + } + + @Test(timeout = 10000) + public void timeoutTriggeredBeforeFirstMessage() throws Exception { + Publisher source = Observable.never(); + TestScheduler testScheduler = Schedulers.test(); + Publisher timer = Observable.timer(1, TimeUnit.DAYS, testScheduler) + .ignoreElements().cast(Void.class); + Publisher timeout = Publishers.timeout(source, timer); + TestSubscriber testSubscriber = new TestSubscriber<>(); + timeout.subscribe(testSubscriber); + + testScheduler.advanceTimeBy(1, TimeUnit.DAYS); + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertError(TimeoutException.class); + testSubscriber.assertNoValues(); + } +} \ No newline at end of file