From e872f0c35ae00a9b76670ff0f72bf70dc883d405 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Fri, 8 Jul 2016 16:11:45 -0700 Subject: [PATCH] `FailingReactiveSocket` violates spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### Problem `FailingReactiveSocket` was invoking `Subscriber.onError()` without calling `onSubscribe()` which is invalid as per reactive-streams spec: https://github.com/reactive-streams/reactive-streams-jvm#api-components ``` This means that onSubscribe is always signalled, followed by a possibly unbounded number of onNext signals (as requested by Subscriber) followed by an onError signal if there is a failure, or an onComplete signal when no more elements are available—all as long as the Subscription is not cancelled. ``` #### Modification Modified to use `Publishers.error()` which follows the spec correctly. #### Result Better adherence of spec, happy users :) --- .../io/reactivesocket/client/LoadBalancer.java | 17 +++++++++++------ .../reactivesocket/DefaultReactiveSocket.java | 17 ----------------- 2 files changed, 11 insertions(+), 23 deletions(-) diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java index 0ee8b75d6..bce59c203 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java @@ -24,6 +24,7 @@ import io.reactivesocket.client.stat.Ewma; import io.reactivesocket.exceptions.TimeoutException; import io.reactivesocket.exceptions.TransportException; +import io.reactivesocket.internal.Publishers; import io.reactivesocket.rx.Completable; import io.reactivesocket.client.stat.FrugalQuantile; import io.reactivesocket.client.stat.Quantile; @@ -629,38 +630,42 @@ public void onComplete() {} * when dealing with edge cases. */ private static class FailingReactiveSocket implements ReactiveSocket { + @SuppressWarnings("ThrowableInstanceNeverThrown") private static final NoAvailableReactiveSocketException NO_AVAILABLE_RS_EXCEPTION = new NoAvailableReactiveSocketException(); + private static final Publisher errorVoid = Publishers.error(NO_AVAILABLE_RS_EXCEPTION); + private static final Publisher errorPayload = Publishers.error(NO_AVAILABLE_RS_EXCEPTION); + @Override public Publisher fireAndForget(Payload payload) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorVoid; } @Override public Publisher requestResponse(Payload payload) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorPayload; } @Override public Publisher requestStream(Payload payload) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorPayload; } @Override public Publisher requestSubscription(Payload payload) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorPayload; } @Override public Publisher requestChannel(Publisher payloads) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorPayload; } @Override public Publisher metadataPush(Payload payload) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorVoid; } @Override diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java b/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java index 8f5b8a911..a10448e8f 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java @@ -478,23 +478,6 @@ public void shutdown() { } } - private static Publisher error(Throwable e) { - return (Subscriber s) -> { - s.onSubscribe(new Subscription() { - @Override - public void request(long n) { - // should probably worry about n==0 - s.onError(e); - } - - @Override - public void cancel() { - // ignoring just because - } - }); - }; - } - public String toString() { return "duplexConnection=[" + this.connection + "]"; }