diff --git a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java index 0ee8b75d6..bce59c203 100644 --- a/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java +++ b/reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java @@ -24,6 +24,7 @@ import io.reactivesocket.client.stat.Ewma; import io.reactivesocket.exceptions.TimeoutException; import io.reactivesocket.exceptions.TransportException; +import io.reactivesocket.internal.Publishers; import io.reactivesocket.rx.Completable; import io.reactivesocket.client.stat.FrugalQuantile; import io.reactivesocket.client.stat.Quantile; @@ -629,38 +630,42 @@ public void onComplete() {} * when dealing with edge cases. */ private static class FailingReactiveSocket implements ReactiveSocket { + @SuppressWarnings("ThrowableInstanceNeverThrown") private static final NoAvailableReactiveSocketException NO_AVAILABLE_RS_EXCEPTION = new NoAvailableReactiveSocketException(); + private static final Publisher errorVoid = Publishers.error(NO_AVAILABLE_RS_EXCEPTION); + private static final Publisher errorPayload = Publishers.error(NO_AVAILABLE_RS_EXCEPTION); + @Override public Publisher fireAndForget(Payload payload) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorVoid; } @Override public Publisher requestResponse(Payload payload) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorPayload; } @Override public Publisher requestStream(Payload payload) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorPayload; } @Override public Publisher requestSubscription(Payload payload) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorPayload; } @Override public Publisher requestChannel(Publisher payloads) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorPayload; } @Override public Publisher metadataPush(Payload payload) { - return subscriber -> subscriber.onError(NO_AVAILABLE_RS_EXCEPTION); + return errorVoid; } @Override diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java b/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java index 8f5b8a911..a10448e8f 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java @@ -478,23 +478,6 @@ public void shutdown() { } } - private static Publisher error(Throwable e) { - return (Subscriber s) -> { - s.onSubscribe(new Subscription() { - @Override - public void request(long n) { - // should probably worry about n==0 - s.onError(e); - } - - @Override - public void cancel() { - // ignoring just because - } - }); - }; - } - public String toString() { return "duplexConnection=[" + this.connection + "]"; }