Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -454,17 +454,15 @@ public void onShutdown(Completable c) {
@Override
public void close() throws Exception {
try {
connection.close();
leaseGovernor.unregister(responder);
if (requester != null) {
requester.shutdown();
}
if (responder != null) {
responder.shutdown();
}

connection.close();
shutdownListeners.forEach(Completable::success);

} catch (Throwable t) {
shutdownListeners.forEach(c -> c.error(t));
throw t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,23 @@
* Concrete implementations of {@link DuplexConnection} over TCP, WebSockets, Aeron, etc can be passed to this class for protocol handling.
*/
public class Requester {

private final static Disposable CANCELLED = new EmptyDisposable();
private final static int KEEPALIVE_INTERVAL_MS = 1000;
private static final Disposable CANCELLED = new EmptyDisposable();
private static final int KEEPALIVE_INTERVAL_MS = 1000;
private static final long DEFAULT_BATCH = 1024;
private static final long REQUEST_THRESHOLD = 256;

private final boolean isServer;
private final DuplexConnection connection;
private final Int2ObjectHashMap<UnicastSubject<Frame>> streamInputMap = new Int2ObjectHashMap<>();
private final ConnectionSetupPayload setupPayload;
private final Consumer<Throwable> errorStream;

private final boolean honorLease;

private long ttlExpiration;
private long numberOfRemainingRequests = 0;
private long timeOfLastKeepalive = 0;
private int streamCount = 0; // 0 is reserved for setup, all normal messages are >= 1

private static final long DEFAULT_BATCH = 1024;
private static final long REQUEST_THRESHOLD = 256;
private AtomicReference<Disposable> connectionSubscription = new AtomicReference<>();

private volatile boolean requesterStarted = false;

Expand Down Expand Up @@ -115,8 +114,10 @@ public static Requester createServerRequester(
}

public void shutdown() {
// TODO do something here
System.err.println("**** Requester.shutdown => this should actually do something");
Disposable disposable = connectionSubscription.getAndSet(CANCELLED);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: EmptyDisposable (CANCELLED here) ignores dispose call, so you don't need && disposable != CANCELLED check here.

if (disposable != null) {
disposable.dispose();
}
}

public boolean isServer() {
Expand Down Expand Up @@ -163,7 +164,7 @@ public Publisher<Payload> requestStream(final Payload payload) {
*/
public Publisher<Void> fireAndForget(final Payload payload) {
if (payload == null) {
throw new IllegalStateException("Payload can not be null");
throw new IllegalStateException(name() + " Payload can not be null");
}
assertStarted();
return child -> child.onSubscribe(new Subscription() {
Expand Down Expand Up @@ -211,7 +212,7 @@ public void cancel() {
*/
public Publisher<Void> metadataPush(final Payload payload) {
if (payload == null) {
throw new IllegalArgumentException("Payload can not be null");
throw new IllegalArgumentException(name() + " Payload can not be null");
}
assertStarted();
return (Subscriber<? super Void> child) ->
Expand Down Expand Up @@ -273,7 +274,7 @@ public Publisher<Payload> requestChannel(final Publisher<Payload> payloadStream)

private void assertStarted() {
if (!requesterStarted) {
throw new IllegalStateException("Requester not initialized. " +
throw new IllegalStateException(name() + " Requester not initialized. " +
"Please await 'start()' completion before submitting requests.");
}
}
Expand Down Expand Up @@ -411,7 +412,7 @@ private Publisher<Payload> startChannel(
Publisher<Payload> payloads
) {
if (payloads == null) {
throw new IllegalStateException("Both payload and payloads can not be null");
throw new IllegalStateException(name() + " Both payload and payloads can not be null");
}
assertStarted();
return (Subscriber<? super Payload> child) -> {
Expand Down Expand Up @@ -497,7 +498,7 @@ public void onNext(Payload p) {
public void onError(Throwable t) {
// TODO validate with unit tests
RuntimeException exc = new RuntimeException(
"Error received from request stream.", t);
name() + " Error received from request stream.", t);
transport.onError(exc);
child.onError(exc);
cancel();
Expand Down Expand Up @@ -617,7 +618,7 @@ public void cancel() {
*/
private Publisher<Payload> startRequestResponse(int streamId, FrameType type, Payload payload) {
if (payload == null) {
throw new IllegalStateException("Both payload and payloads can not be null");
throw new IllegalStateException(name() + " Both payload and payloads can not be null");
}
assertStarted();
return (Subscriber<? super Payload> child) -> {
Expand Down Expand Up @@ -837,7 +838,6 @@ private int nextStreamId() {
}

private void start(Completable onComplete) {
AtomicReference<Disposable> connectionSubscription = new AtomicReference<>();
// get input from responder->requestor for responses
connection.getInput().subscribe(new Observer<Frame>() {
public void onSubscribe(Disposable d) {
Expand Down Expand Up @@ -888,7 +888,7 @@ public void error(Throwable e) {
} else {
// means we already were cancelled
d.dispose();
onComplete.error(new CancelException("Connection Is Already Cancelled"));
onComplete.error(new CancelException(name() + " Connection Is Already Cancelled"));
}
}

Expand Down Expand Up @@ -916,7 +916,7 @@ public void onNext(Frame frame) {
timeOfLastKeepalive = System.currentTimeMillis();
} else {
onError(new RuntimeException(
"Received unexpected message type on stream 0: " + frame.getType().name()));
name() + " Received unexpected message type on stream 0: " + frame.getType().name()));
}
} else {
UnicastSubject<Frame> streamSubject;
Expand All @@ -934,11 +934,11 @@ public void onNext(Frame frame) {
if (frame.getType() == FrameType.ERROR) {
String errorMessage = getByteBufferAsString(frame.getData());
onError(new RuntimeException(
"Received error for non-existent stream: "
name() + " Received error for non-existent stream: "
+ streamId + " Message: " + errorMessage));
} else {
onError(new RuntimeException(
"Received message for non-existent stream: " + streamId));
name() + " Received message for non-existent stream: " + streamId));
}
}
} else {
Expand Down Expand Up @@ -981,6 +981,14 @@ public void cancel() { // TODO this isn't used ... is it supposed to be?
});
}

private String name() {
if (isServer) {
return "ServerRequester";
} else {
return "ClientRequester";
}
}

private static String getByteBufferAsString(ByteBuffer bb) {
final byte[] bytes = new byte[bb.capacity()];
bb.get(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
Expand All @@ -53,6 +54,8 @@
* for each request over the connection.
*/
public class Responder {
private final static Disposable CANCELLED = new EmptyDisposable();

private final DuplexConnection connection;
private final ConnectionSetupHandler connectionHandler; // for server
private final RequestHandler clientRequestHandler; // for client
Expand All @@ -61,6 +64,7 @@ public class Responder {
private long timeOfLastKeepalive;
private final Consumer<ConnectionSetupPayload> setupCallback;
private final boolean isServer;
private final AtomicReference<Disposable> transportSubscription = new AtomicReference<>();

private Responder(
boolean isServer,
Expand Down Expand Up @@ -146,7 +150,7 @@ public void success() {}

@Override
public void error(Throwable e) {
errorStream.accept(new RuntimeException("could not send lease ", e));
errorStream.accept(new RuntimeException(name() + ": could not send lease ", e));
}
});
}
Expand All @@ -171,7 +175,6 @@ private void start(final Completable responderCompletable, ReactiveSocket reacti
final Int2ObjectHashMap<UnicastSubject<Payload>> channels = new Int2ObjectHashMap<>();

final AtomicBoolean childTerminated = new AtomicBoolean(false);
final AtomicReference<Disposable> transportSubscription = new AtomicReference<>();

// subscribe to transport to get Frames
connection.getInput().subscribe(new Observer<Frame>() {
Expand Down Expand Up @@ -205,8 +208,7 @@ public void onNext(Frame requestFrame) {
try {
int version = Frame.Setup.version(requestFrame);
if (version != SetupFrameFlyweight.CURRENT_VERSION) {
throw new SetupException("unsupported protocol version: "
+ version);
throw new SetupException(name() + ": unsupported protocol version: " + version);
}

// accept setup for ReactiveSocket/Requester usage
Expand All @@ -231,7 +233,7 @@ public void onNext(Frame requestFrame) {
// TODO: handle keepalive logic here
} else {
setupErrorAndTearDown(connection,
new InvalidSetupException("Setup frame missing"));
new InvalidSetupException(name() + ": Setup frame missing"));
}
} else {
Publisher<Frame> responsePublisher = null;
Expand Down Expand Up @@ -293,21 +295,21 @@ public void onNext(Frame requestFrame) {
// LEASE only concerns the Requester
} else {
IllegalStateException exc = new IllegalStateException(
"Unexpected prefix: " + requestFrame.getType());
name() + ": Unexpected prefix: " + requestFrame.getType());
responsePublisher = PublisherUtils.errorFrame(streamId, exc);
}
} catch (Throwable e) {
// synchronous try/catch since we execute user functions
// in the handlers and they could throw
errorStream.accept(
new RuntimeException("Error in request handling.", e));
new RuntimeException(name() + ": Error in request handling.", e));
// error message to user
responsePublisher = PublisherUtils.errorFrame(
streamId, new RuntimeException(
"Unhandled error processing request"));
name() + ": Unhandled error processing request"));
}
} else {
RejectedException exception = new RejectedException("No associated lease");
RejectedException exception = new RejectedException(name() + ": No associated lease");
responsePublisher = PublisherUtils.errorFrame(streamId, exception);
}

Expand Down Expand Up @@ -348,15 +350,15 @@ public void success() {
@Override
public void error(Throwable e) {
RuntimeException exc = new RuntimeException(
"Failure outputting SetupException", e);
name() + ": Failure outputting SetupException", e);
tearDownWithError(exc);
}
});
}

private void tearDownWithError(Throwable se) {
// TODO unit test that this actually shuts things down
onError(new RuntimeException("Connection Setup Failure", se));
onError(new RuntimeException(name() + ": Connection Setup Failure", se));
}

@Override
Expand All @@ -380,7 +382,8 @@ public void onComplete() {
private void cancel() {
// child has cancelled (shutdown the connection or server)
// TODO validate with unit tests
if (!transportSubscription.compareAndSet(null, EmptyDisposable.EMPTY)) {
Disposable disposable = transportSubscription.getAndSet(CANCELLED);
if (disposable != null) {
// cancel the one that was there if we failed to set the sentinel
transportSubscription.get().dispose();
}
Expand All @@ -390,8 +393,10 @@ private void cancel() {
}

public void shutdown() {
// TODO do something here
System.err.println("**** Responder.shutdown => this should actually do something");
Disposable disposable = transportSubscription.getAndSet(CANCELLED);
if (disposable != null && disposable != CANCELLED) {
disposable.dispose();
}
}

private Publisher<Frame> handleRequestResponse(
Expand Down Expand Up @@ -432,7 +437,7 @@ public void onSubscribe(Subscription s) {
public void onNext(Payload v) {
if (++count > 1) {
IllegalStateException exc = new IllegalStateException(
"RequestResponse expects a single onNext");
name() + ": RequestResponse expects a single onNext");
onError(exc);
} else {
Frame nextCompleteFrame = Frame.Response.from(
Expand All @@ -451,7 +456,7 @@ public void onError(Throwable t) {
public void onComplete() {
if (count != 1) {
IllegalStateException exc = new IllegalStateException(
"RequestResponse expects a single onNext");
name() + ": RequestResponse expects a single onNext");
onError(exc);
} else {
child.onComplete();
Expand Down Expand Up @@ -602,8 +607,8 @@ public void onComplete() {
cleanup();
} else {
IllegalStateException exc = new IllegalStateException(
"Unexpected onComplete occurred on " +
"'requestSubscription'");
name() + ": Unexpected onComplete occurred on " +
"'requestSubscription'");
onError(exc);
}
}
Expand Down Expand Up @@ -652,7 +657,7 @@ private Publisher<Frame> handleFireAndForget(
} catch (Throwable e) {
// we catch these errors here as we don't want anything propagating
// back to the user on fireAndForget
errorStream.accept(new RuntimeException("Error processing 'fireAndForget'", e));
errorStream.accept(new RuntimeException(name() + ": Error processing 'fireAndForget'", e));
}
// we always treat this as if it immediately completes as we don't want
// errors passing back to the user
Expand All @@ -668,7 +673,7 @@ private Publisher<Frame> handleMetadataPush(
} catch (Throwable e) {
// we catch these errors here as we don't want anything propagating
// back to the user on metadataPush
errorStream.accept(new RuntimeException("Error processing 'metadataPush'", e));
errorStream.accept(new RuntimeException(name() + ": Error processing 'metadataPush'", e));
}
// we always treat this as if it immediately completes as we don't want
// errors passing back to the user
Expand Down Expand Up @@ -745,7 +750,7 @@ public void request(long n) {
// didn't correct wait for REQUEST_N before sending
// more frames
RuntimeException exc = new RuntimeException(
"Requester sent more than 1 requestChannel " +
name() + " sent more than 1 requestChannel " +
"frame before permitted.");
child.onNext(Frame.Error.from(streamId, exc));
child.onComplete();
Expand Down Expand Up @@ -846,11 +851,19 @@ private void cleanup() {
// handle time-gap issues like this?
// TODO validate with unit tests.
return PublisherUtils.errorFrame(
streamId, new RuntimeException("Channel unavailable"));
streamId, new RuntimeException(name() + ": Channel unavailable"));
}
}
}


private String name() {
if (isServer) {
return "ServerResponder";
} else {
return "ClientResponder";
}
}

private static class SubscriptionArbiter {
private Subscription applicationProducer;
private long appRequested = 0;
Expand Down