Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.ReferenceCounted;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
Expand Down Expand Up @@ -355,34 +355,26 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
final HttpRequest httpRequest = (HttpRequest) msg;

if (ProxyUtils.isChunked(httpRequest)) {
process(ctx, httpRequest);
process(ctx, httpRequest, true);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need it so the message is not released when we use chunks (when we do not use custom executor)

} else {
if (httpRequest instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
((ReferenceCounted) msg).retain();
}
ReferenceCountUtil.retain(httpRequest);

proxyServer.getMessageProcessingExecutor()
.execute(wrapTask(() -> {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to move wrapTask into try block so we release the message in case an exception happens there

.execute(() -> {
try {
process(ctx, httpRequest);
wrapTask(() -> process(ctx, httpRequest, false)).run();
} catch (Exception e) {
ctx.fireExceptionCaught(e);
} finally {
if (httpRequest instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
((ReferenceCounted) httpRequest).release();
}
ReferenceCountUtil.release(httpRequest);
}
}));
});
}
}

private void process(ChannelHandlerContext ctx, HttpRequest httpRequest) {
private void process(ChannelHandlerContext ctx, HttpRequest httpRequest, boolean chunked) {

boolean authenticationRequired = false;
HttpResponse shortCircuitResponse = null;
authenticationRequired = authenticationRequired(httpRequest);
boolean authenticationRequired = authenticationRequired(httpRequest);

if (authenticationRequired) {
LOG.debug("Not authenticated!!");
Expand All @@ -398,9 +390,7 @@ private void process(ChannelHandlerContext ctx, HttpRequest httpRequest) {
filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
} finally {
// releasing a copied http request
if (currentRequest instanceof ReferenceCounted) {
((ReferenceCounted) currentRequest).release();
}
ReferenceCountUtil.release(currentRequest);
}
if (filterInstance != null) {
currentFilters = filterInstance;
Expand All @@ -409,17 +399,24 @@ private void process(ChannelHandlerContext ctx, HttpRequest httpRequest) {
}

// Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
shortCircuitResponse = currentFilters.clientToProxyRequest(httpRequest);

}
final HttpResponse shortCircuitResponse = currentFilters.clientToProxyRequest(httpRequest);

if (!authenticationRequired) {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored authenticationRequired part, there was one extra if statement

if (httpRequest instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
((ReferenceCounted) httpRequest).retain();
if (chunked) {
ctx.fireChannelRead(new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse));
} else {
ReferenceCountUtil.retain(httpRequest);
channel.eventLoop().execute(() -> {
try {
wrapTask(() ->
ctx.fireChannelRead(
new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse))
).run();
} finally {
ReferenceCountUtil.release(httpRequest);
}
});
}

ctx.fireChannelRead(new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -259,26 +260,22 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
respondWith(httpResponse);
become(AWAITING_CHUNK);
} else {
if (httpResponse instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
((ReferenceCounted) httpResponse).retain();
}
ReferenceCountUtil.retain(httpResponse);

proxyServer.getMessageProcessingExecutor()
.execute(clientConnection.wrapTask(() -> {
.execute(() -> {
try {
respondWith(httpResponse);
currentFilters.serverToProxyResponseReceived();
become(AWAITING_INITIAL);
clientConnection.wrapTask(() -> {
respondWith(httpResponse);
currentFilters.serverToProxyResponseReceived();
become(AWAITING_INITIAL);
}).run();
} catch (Exception e) {
exceptionCaught(ctx, e);
} finally {
if (httpResponse instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
((ReferenceCounted) httpResponse).release();
}
ReferenceCountUtil.release(httpResponse);
}
}));
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.ReferenceCountUtil;

public class UpstreamConnectionHandler extends ChannelInboundHandlerAdapter {

Expand All @@ -16,14 +15,10 @@ public class UpstreamConnectionHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object request) {
try {
final ConnectionState connectionState =
clientToProxyConnection.setupUpstreamConnection(((Request) request).getShortCircuitResponse(),
((Request) request).getInitialRequest());
clientToProxyConnection.become(connectionState);
} finally {
ReferenceCountUtil.release(((Request) request).getInitialRequest());
}
final ConnectionState connectionState =
clientToProxyConnection.setupUpstreamConnection(((Request) request).getShortCircuitResponse(),
((Request) request).getInitialRequest());
clientToProxyConnection.become(connectionState);
}

@Override
Expand Down