diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java b/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java index 1eb3a48c2..8f5b8a911 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/DefaultReactiveSocket.java @@ -454,7 +454,6 @@ public void onShutdown(Completable c) { @Override public void close() throws Exception { try { - connection.close(); leaseGovernor.unregister(responder); if (requester != null) { requester.shutdown(); @@ -462,9 +461,8 @@ public void close() throws Exception { if (responder != null) { responder.shutdown(); } - + connection.close(); shutdownListeners.forEach(Completable::success); - } catch (Throwable t) { shutdownListeners.forEach(c -> c.error(t)); throw t; diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java index 284d3b7a4..9ee7e812b 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Requester.java @@ -52,24 +52,23 @@ * Concrete implementations of {@link DuplexConnection} over TCP, WebSockets, Aeron, etc can be passed to this class for protocol handling. */ public class Requester { - - private final static Disposable CANCELLED = new EmptyDisposable(); - private final static int KEEPALIVE_INTERVAL_MS = 1000; + private static final Disposable CANCELLED = new EmptyDisposable(); + private static final int KEEPALIVE_INTERVAL_MS = 1000; + private static final long DEFAULT_BATCH = 1024; + private static final long REQUEST_THRESHOLD = 256; private final boolean isServer; private final DuplexConnection connection; private final Int2ObjectHashMap> streamInputMap = new Int2ObjectHashMap<>(); private final ConnectionSetupPayload setupPayload; private final Consumer errorStream; - private final boolean honorLease; + private long ttlExpiration; private long numberOfRemainingRequests = 0; private long timeOfLastKeepalive = 0; private int streamCount = 0; // 0 is reserved for setup, all normal messages are >= 1 - - private static final long DEFAULT_BATCH = 1024; - private static final long REQUEST_THRESHOLD = 256; + private AtomicReference connectionSubscription = new AtomicReference<>(); private volatile boolean requesterStarted = false; @@ -115,8 +114,10 @@ public static Requester createServerRequester( } public void shutdown() { - // TODO do something here - System.err.println("**** Requester.shutdown => this should actually do something"); + Disposable disposable = connectionSubscription.getAndSet(CANCELLED); + if (disposable != null) { + disposable.dispose(); + } } public boolean isServer() { @@ -163,7 +164,7 @@ public Publisher requestStream(final Payload payload) { */ public Publisher fireAndForget(final Payload payload) { if (payload == null) { - throw new IllegalStateException("Payload can not be null"); + throw new IllegalStateException(name() + " Payload can not be null"); } assertStarted(); return child -> child.onSubscribe(new Subscription() { @@ -211,7 +212,7 @@ public void cancel() { */ public Publisher metadataPush(final Payload payload) { if (payload == null) { - throw new IllegalArgumentException("Payload can not be null"); + throw new IllegalArgumentException(name() + " Payload can not be null"); } assertStarted(); return (Subscriber child) -> @@ -273,7 +274,7 @@ public Publisher requestChannel(final Publisher payloadStream) private void assertStarted() { if (!requesterStarted) { - throw new IllegalStateException("Requester not initialized. " + + throw new IllegalStateException(name() + " Requester not initialized. " + "Please await 'start()' completion before submitting requests."); } } @@ -411,7 +412,7 @@ private Publisher startChannel( Publisher payloads ) { if (payloads == null) { - throw new IllegalStateException("Both payload and payloads can not be null"); + throw new IllegalStateException(name() + " Both payload and payloads can not be null"); } assertStarted(); return (Subscriber child) -> { @@ -497,7 +498,7 @@ public void onNext(Payload p) { public void onError(Throwable t) { // TODO validate with unit tests RuntimeException exc = new RuntimeException( - "Error received from request stream.", t); + name() + " Error received from request stream.", t); transport.onError(exc); child.onError(exc); cancel(); @@ -617,7 +618,7 @@ public void cancel() { */ private Publisher startRequestResponse(int streamId, FrameType type, Payload payload) { if (payload == null) { - throw new IllegalStateException("Both payload and payloads can not be null"); + throw new IllegalStateException(name() + " Both payload and payloads can not be null"); } assertStarted(); return (Subscriber child) -> { @@ -837,7 +838,6 @@ private int nextStreamId() { } private void start(Completable onComplete) { - AtomicReference connectionSubscription = new AtomicReference<>(); // get input from responder->requestor for responses connection.getInput().subscribe(new Observer() { public void onSubscribe(Disposable d) { @@ -888,7 +888,7 @@ public void error(Throwable e) { } else { // means we already were cancelled d.dispose(); - onComplete.error(new CancelException("Connection Is Already Cancelled")); + onComplete.error(new CancelException(name() + " Connection Is Already Cancelled")); } } @@ -916,7 +916,7 @@ public void onNext(Frame frame) { timeOfLastKeepalive = System.currentTimeMillis(); } else { onError(new RuntimeException( - "Received unexpected message type on stream 0: " + frame.getType().name())); + name() + " Received unexpected message type on stream 0: " + frame.getType().name())); } } else { UnicastSubject streamSubject; @@ -934,11 +934,11 @@ public void onNext(Frame frame) { if (frame.getType() == FrameType.ERROR) { String errorMessage = getByteBufferAsString(frame.getData()); onError(new RuntimeException( - "Received error for non-existent stream: " + name() + " Received error for non-existent stream: " + streamId + " Message: " + errorMessage)); } else { onError(new RuntimeException( - "Received message for non-existent stream: " + streamId)); + name() + " Received message for non-existent stream: " + streamId)); } } } else { @@ -981,6 +981,14 @@ public void cancel() { // TODO this isn't used ... is it supposed to be? }); } + private String name() { + if (isServer) { + return "ServerRequester"; + } else { + return "ClientRequester"; + } + } + private static String getByteBufferAsString(ByteBuffer bb) { final byte[] bytes = new byte[bb.capacity()]; bb.get(bytes); diff --git a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Responder.java b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Responder.java index 0af49c4ff..d319c39d5 100644 --- a/reactivesocket-core/src/main/java/io/reactivesocket/internal/Responder.java +++ b/reactivesocket-core/src/main/java/io/reactivesocket/internal/Responder.java @@ -39,6 +39,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import java.nio.channels.ClosedChannelException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -53,6 +54,8 @@ * for each request over the connection. */ public class Responder { + private final static Disposable CANCELLED = new EmptyDisposable(); + private final DuplexConnection connection; private final ConnectionSetupHandler connectionHandler; // for server private final RequestHandler clientRequestHandler; // for client @@ -61,6 +64,7 @@ public class Responder { private long timeOfLastKeepalive; private final Consumer setupCallback; private final boolean isServer; + private final AtomicReference transportSubscription = new AtomicReference<>(); private Responder( boolean isServer, @@ -146,7 +150,7 @@ public void success() {} @Override public void error(Throwable e) { - errorStream.accept(new RuntimeException("could not send lease ", e)); + errorStream.accept(new RuntimeException(name() + ": could not send lease ", e)); } }); } @@ -171,7 +175,6 @@ private void start(final Completable responderCompletable, ReactiveSocket reacti final Int2ObjectHashMap> channels = new Int2ObjectHashMap<>(); final AtomicBoolean childTerminated = new AtomicBoolean(false); - final AtomicReference transportSubscription = new AtomicReference<>(); // subscribe to transport to get Frames connection.getInput().subscribe(new Observer() { @@ -205,8 +208,7 @@ public void onNext(Frame requestFrame) { try { int version = Frame.Setup.version(requestFrame); if (version != SetupFrameFlyweight.CURRENT_VERSION) { - throw new SetupException("unsupported protocol version: " - + version); + throw new SetupException(name() + ": unsupported protocol version: " + version); } // accept setup for ReactiveSocket/Requester usage @@ -231,7 +233,7 @@ public void onNext(Frame requestFrame) { // TODO: handle keepalive logic here } else { setupErrorAndTearDown(connection, - new InvalidSetupException("Setup frame missing")); + new InvalidSetupException(name() + ": Setup frame missing")); } } else { Publisher responsePublisher = null; @@ -293,21 +295,21 @@ public void onNext(Frame requestFrame) { // LEASE only concerns the Requester } else { IllegalStateException exc = new IllegalStateException( - "Unexpected prefix: " + requestFrame.getType()); + name() + ": Unexpected prefix: " + requestFrame.getType()); responsePublisher = PublisherUtils.errorFrame(streamId, exc); } } catch (Throwable e) { // synchronous try/catch since we execute user functions // in the handlers and they could throw errorStream.accept( - new RuntimeException("Error in request handling.", e)); + new RuntimeException(name() + ": Error in request handling.", e)); // error message to user responsePublisher = PublisherUtils.errorFrame( streamId, new RuntimeException( - "Unhandled error processing request")); + name() + ": Unhandled error processing request")); } } else { - RejectedException exception = new RejectedException("No associated lease"); + RejectedException exception = new RejectedException(name() + ": No associated lease"); responsePublisher = PublisherUtils.errorFrame(streamId, exception); } @@ -348,7 +350,7 @@ public void success() { @Override public void error(Throwable e) { RuntimeException exc = new RuntimeException( - "Failure outputting SetupException", e); + name() + ": Failure outputting SetupException", e); tearDownWithError(exc); } }); @@ -356,7 +358,7 @@ public void error(Throwable e) { private void tearDownWithError(Throwable se) { // TODO unit test that this actually shuts things down - onError(new RuntimeException("Connection Setup Failure", se)); + onError(new RuntimeException(name() + ": Connection Setup Failure", se)); } @Override @@ -380,7 +382,8 @@ public void onComplete() { private void cancel() { // child has cancelled (shutdown the connection or server) // TODO validate with unit tests - if (!transportSubscription.compareAndSet(null, EmptyDisposable.EMPTY)) { + Disposable disposable = transportSubscription.getAndSet(CANCELLED); + if (disposable != null) { // cancel the one that was there if we failed to set the sentinel transportSubscription.get().dispose(); } @@ -390,8 +393,10 @@ private void cancel() { } public void shutdown() { - // TODO do something here - System.err.println("**** Responder.shutdown => this should actually do something"); + Disposable disposable = transportSubscription.getAndSet(CANCELLED); + if (disposable != null && disposable != CANCELLED) { + disposable.dispose(); + } } private Publisher handleRequestResponse( @@ -432,7 +437,7 @@ public void onSubscribe(Subscription s) { public void onNext(Payload v) { if (++count > 1) { IllegalStateException exc = new IllegalStateException( - "RequestResponse expects a single onNext"); + name() + ": RequestResponse expects a single onNext"); onError(exc); } else { Frame nextCompleteFrame = Frame.Response.from( @@ -451,7 +456,7 @@ public void onError(Throwable t) { public void onComplete() { if (count != 1) { IllegalStateException exc = new IllegalStateException( - "RequestResponse expects a single onNext"); + name() + ": RequestResponse expects a single onNext"); onError(exc); } else { child.onComplete(); @@ -602,8 +607,8 @@ public void onComplete() { cleanup(); } else { IllegalStateException exc = new IllegalStateException( - "Unexpected onComplete occurred on " + - "'requestSubscription'"); + name() + ": Unexpected onComplete occurred on " + + "'requestSubscription'"); onError(exc); } } @@ -652,7 +657,7 @@ private Publisher handleFireAndForget( } catch (Throwable e) { // we catch these errors here as we don't want anything propagating // back to the user on fireAndForget - errorStream.accept(new RuntimeException("Error processing 'fireAndForget'", e)); + errorStream.accept(new RuntimeException(name() + ": Error processing 'fireAndForget'", e)); } // we always treat this as if it immediately completes as we don't want // errors passing back to the user @@ -668,7 +673,7 @@ private Publisher handleMetadataPush( } catch (Throwable e) { // we catch these errors here as we don't want anything propagating // back to the user on metadataPush - errorStream.accept(new RuntimeException("Error processing 'metadataPush'", e)); + errorStream.accept(new RuntimeException(name() + ": Error processing 'metadataPush'", e)); } // we always treat this as if it immediately completes as we don't want // errors passing back to the user @@ -745,7 +750,7 @@ public void request(long n) { // didn't correct wait for REQUEST_N before sending // more frames RuntimeException exc = new RuntimeException( - "Requester sent more than 1 requestChannel " + + name() + " sent more than 1 requestChannel " + "frame before permitted."); child.onNext(Frame.Error.from(streamId, exc)); child.onComplete(); @@ -846,11 +851,19 @@ private void cleanup() { // handle time-gap issues like this? // TODO validate with unit tests. return PublisherUtils.errorFrame( - streamId, new RuntimeException("Channel unavailable")); + streamId, new RuntimeException(name() + ": Channel unavailable")); } } } - + + private String name() { + if (isServer) { + return "ServerResponder"; + } else { + return "ClientResponder"; + } + } + private static class SubscriptionArbiter { private Subscription applicationProducer; private long appRequested = 0;