diff --git a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
index cab76a06a..8764d3521 100644
--- a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
+++ b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
@@ -4,6 +4,7 @@
import org.littleshoot.proxy.ratelimit.RateLimiter;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
/**
* Configures and starts an {@link HttpProxyServer}. The HttpProxyServer is
@@ -242,6 +243,14 @@ HttpProxyServerBootstrap withProxyToServerExHandler(
HttpProxyServerBootstrap withCustomGlobalState(
GlobalStateHandler globalStateHandler);
+ /**
+ *
+ * @param messageProcessorExecutor
+ * @return
+ */
+ HttpProxyServerBootstrap withMessageProcessingExecutor(
+ ExecutorService messageProcessorExecutor);
+
/**
*
* Specify a {@link HttpFiltersSource} to use for filtering requests and/or
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index b476c14c2..4c030e9b0 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -3,6 +3,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
@@ -21,13 +23,16 @@
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.ReferenceCounted;
+import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
+
import org.apache.commons.lang3.StringUtils;
import org.littleshoot.proxy.ActivityTracker;
import org.littleshoot.proxy.DefaultFailureHttpResponseComposer;
import org.littleshoot.proxy.ExceptionHandler;
import org.littleshoot.proxy.FailureHttpResponseComposer;
+import org.littleshoot.proxy.GlobalStateHandler;
import org.littleshoot.proxy.ratelimit.RateLimiter;
import org.littleshoot.proxy.FlowContext;
import org.littleshoot.proxy.FullFlowContext;
@@ -44,6 +49,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
@@ -143,15 +149,17 @@ public class ClientToProxyConnection extends ProxyConnection {
final DefaultHttpProxyServer proxyServer,
SslEngineSource sslEngineSource,
boolean authenticateClients,
- ChannelPipeline pipeline,
- GlobalTrafficShapingHandler globalTrafficShapingHandler) {
+ GlobalTrafficShapingHandler globalTrafficShapingHandler,
+ Channel channel) {
super(AWAITING_INITIAL, proxyServer, false);
- initChannelPipeline(pipeline);
+ this.channel = channel;
+
+ initChannelPipeline(channel.pipeline());
if (sslEngineSource != null) {
LOG.debug("Enabling encryption of traffic from client to proxy");
- encrypt(pipeline, sslEngineSource.newSslEngine(),
+ encrypt(channel.pipeline(), sslEngineSource.newSslEngine(),
authenticateClients)
.addListener(
new GenericFutureListener>() {
@@ -177,7 +185,8 @@ public void operationComplete(
**************************************************************************/
@Override
- protected ConnectionState readHTTPInitial(HttpRequest httpRequest) {
+ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpRequestObj) {
+ HttpRequest httpRequest = (HttpRequest) httpRequestObj;
LOG.debug("Received raw request: {}", httpRequest);
// if we cannot parse the request, immediately return a 400 and close the connection, since we do not know what state
@@ -195,14 +204,9 @@ protected ConnectionState readHTTPInitial(HttpRequest httpRequest) {
return DISCONNECT_REQUESTED;
}
- boolean authenticationRequired = authenticationRequired(httpRequest);
+ ctx.fireChannelRead(httpRequest);
- if (authenticationRequired) {
- LOG.debug("Not authenticated!!");
- return AWAITING_PROXY_AUTHENTICATION;
- } else {
- return doReadHTTPInitial(httpRequest);
- }
+ return getCurrentState();
}
/**
@@ -223,34 +227,11 @@ protected ConnectionState readHTTPInitial(HttpRequest httpRequest) {
* @param httpRequest
* @return
*/
- private ConnectionState doReadHTTPInitial(HttpRequest httpRequest) {
- // Make a copy of the original request
- final HttpRequest currentRequest = copy(httpRequest);
-
- // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response
- // should not be filtered), fall back to the default no-op filter source.
- HttpFilters filterInstance;
- try {
- filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
- } finally {
- // releasing a copied http request
- if (currentRequest instanceof ReferenceCounted) {
- ((ReferenceCounted)currentRequest).release();
- }
- }
- if (filterInstance != null) {
- currentFilters = filterInstance;
- } else {
- currentFilters = HttpFiltersAdapter.NOOP_FILTER;
- }
-
- // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
- HttpResponse clientToProxyFilterResponse = currentFilters.clientToProxyRequest(httpRequest);
+ public ConnectionState setupUpstreamConnection(HttpResponse shortCircuitResponse, HttpRequest httpRequest) {
+ if (shortCircuitResponse != null) {
+ LOG.debug("Responding to client with short-circuit response from filter: {}", shortCircuitResponse);
- if (clientToProxyFilterResponse != null) {
- LOG.debug("Responding to client with short-circuit response from filter: {}", clientToProxyFilterResponse);
-
- boolean keepAlive = respondWithShortCircuitResponse(clientToProxyFilterResponse);
+ boolean keepAlive = respondWithShortCircuitResponse(shortCircuitResponse);
if (keepAlive) {
return AWAITING_INITIAL;
} else {
@@ -366,6 +347,96 @@ private ConnectionState doReadHTTPInitial(HttpRequest httpRequest) {
}
}
+ @Sharable
+ protected class ClientToProxyMessageProcessor extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ final HttpRequest httpRequest = (HttpRequest) msg;
+
+ if (ProxyUtils.isChunked(httpRequest)) {
+ process(ctx, httpRequest);
+ } else {
+ if (httpRequest instanceof ReferenceCounted) {
+ LOG.debug("Retaining reference counted message");
+ ((ReferenceCounted) msg).retain();
+ }
+
+ proxyServer.getMessageProcessingExecutor()
+ .execute(wrapTask(() -> {
+ try {
+ process(ctx, httpRequest);
+ } catch (Exception e) {
+ ctx.fireExceptionCaught(e);
+ } finally {
+ if (httpRequest instanceof ReferenceCounted) {
+ LOG.debug("Retaining reference counted message");
+ ((ReferenceCounted) httpRequest).release();
+ }
+ }
+ }));
+ }
+ }
+
+ private void process(ChannelHandlerContext ctx, HttpRequest httpRequest) {
+
+ boolean authenticationRequired = false;
+ HttpResponse shortCircuitResponse = null;
+ authenticationRequired = authenticationRequired(httpRequest);
+
+ if (authenticationRequired) {
+ LOG.debug("Not authenticated!!");
+ become(AWAITING_PROXY_AUTHENTICATION);
+ } else {
+ // Make a copy of the original request
+ final HttpRequest currentRequest = copy(httpRequest);
+
+ // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response
+ // should not be filtered), fall back to the default no-op filter source.
+ HttpFilters filterInstance;
+ try {
+ filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
+ } finally {
+ // releasing a copied http request
+ if (currentRequest instanceof ReferenceCounted) {
+ ((ReferenceCounted) currentRequest).release();
+ }
+ }
+ if (filterInstance != null) {
+ currentFilters = filterInstance;
+ } else {
+ currentFilters = HttpFiltersAdapter.NOOP_FILTER;
+ }
+
+ // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
+ shortCircuitResponse = currentFilters.clientToProxyRequest(httpRequest);
+
+ }
+
+ if (!authenticationRequired) {
+ if (httpRequest instanceof ReferenceCounted) {
+ LOG.debug("Retaining reference counted message");
+ ((ReferenceCounted) httpRequest).retain();
+ }
+
+ ctx.fireChannelRead(new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse));
+ }
+ }
+ }
+
+ Runnable wrapTask(Runnable task) {
+ return () -> {
+ final Optional globalStateHandler =
+ Optional.ofNullable(proxyServer.getGlobalStateHandler());
+ try {
+ globalStateHandler.ifPresent(it -> it.restoreFromChannel(channel));
+ task.run();
+ } finally {
+ globalStateHandler.ifPresent(GlobalStateHandler::clear);
+ }
+ };
+ }
+
/**
* Returns true if the specified request is a request to an origin server, rather than to a proxy server. If this
* request is being MITM'd, this method always returns false. The format of requests to a proxy server are defined
@@ -800,8 +871,10 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(this));
}
- pipeline.addLast("bytesReadMonitor", bytesReadMonitor);
- pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor);
+ EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this);
+
+ pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("encoder", new HttpResponseEncoder());
// We want to allow longer request lines, headers, and chunks
@@ -818,8 +891,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
- pipeline.addLast("requestReadMonitor", requestReadMonitor);
- pipeline.addLast("responseWrittenMonitor", responseWrittenMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "requestReadMonitor", requestReadMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "responseWrittenMonitor", responseWrittenMonitor);
pipeline.addLast(
"idle",
@@ -830,7 +903,10 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(this));
}
- pipeline.addLast("handler", this);
+ pipeline.addLast(globalStateWrapperEvenLoop, "router", this);
+ pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", new HttpInitialHandler<>(this));
+ pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyMessageProcessor", new ClientToProxyMessageProcessor());
+ pipeline.addLast(globalStateWrapperEvenLoop, "upstreamConnectionHandler", new UpstreamConnectionHandler(this));
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
index aaee33af8..65a9a9ce4 100644
--- a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
+++ b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
@@ -51,6 +51,7 @@
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -551,8 +552,8 @@ protected void initChannel(Channel ch) throws Exception {
DefaultHttpProxyServer.this,
sslEngineSource,
authenticateSslClients,
- ch.pipeline(),
- globalTrafficShapingHandler);
+ globalTrafficShapingHandler,
+ ch);
};
};
switch (transportProtocol) {
@@ -620,6 +621,10 @@ protected GlobalStateHandler getGlobalStateHandler() {
return globalStateHandler;
}
+ protected ExecutorService getMessageProcessingExecutor() {
+ return serverGroup.getMessageProcessingExecutor();
+ }
+
protected RequestTracer getRequestTracer() {
return requestTracer;
}
@@ -670,6 +675,7 @@ private static class DefaultHttpProxyServerBootstrap implements HttpProxyServerB
private ExceptionHandler proxyToServerExHandler = null;
private RequestTracer requestTracer = null;
private GlobalStateHandler globalStateHandler = null;
+ private ExecutorService messageProcessorExecutor = null;
private HttpFiltersSource filtersSource = new HttpFiltersSourceAdapter();
private FailureHttpResponseComposer unrecoverableFailureHttpResponseComposer = new DefaultFailureHttpResponseComposer();
private boolean transparent = false;
@@ -895,6 +901,13 @@ public HttpProxyServerBootstrap withCustomGlobalState(
return this;
}
+ @Override
+ public HttpProxyServerBootstrap withMessageProcessingExecutor(
+ ExecutorService messageProcessorExecutor) {
+ this.messageProcessorExecutor = messageProcessorExecutor;
+ return this;
+ }
+
@Override
public HttpProxyServerBootstrap withFiltersSource(
HttpFiltersSource filtersSource) {
@@ -1010,7 +1023,8 @@ private DefaultHttpProxyServer build() {
serverGroup = this.serverGroup;
}
else {
- serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads, clientToProxyWorkerThreads, proxyToServerWorkerThreads);
+ serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads,
+ clientToProxyWorkerThreads, proxyToServerWorkerThreads, messageProcessorExecutor);
}
return new DefaultHttpProxyServer(serverGroup,
diff --git a/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java b/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
new file mode 100644
index 000000000..f3df53ed9
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
@@ -0,0 +1,183 @@
+package org.littleshoot.proxy.impl;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ProgressivePromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+
+public class GlobalStateWrapperEvenLoop implements EventExecutor {
+
+ private final ClientToProxyConnection connection;
+
+ private final EventExecutor eventLoop;
+
+ GlobalStateWrapperEvenLoop(ClientToProxyConnection connection) {
+ this.connection = connection;
+ this.eventLoop = connection.channel.eventLoop();
+ }
+
+ GlobalStateWrapperEvenLoop(ClientToProxyConnection connection, EventExecutor eventLoop) {
+ this.connection = connection;
+ this.eventLoop = eventLoop;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ eventLoop.execute(connection.wrapTask(command));
+ }
+
+ @Override
+ public boolean isShuttingDown() {
+ return eventLoop.isShuttingDown();
+ }
+
+ @Override
+ public Future> shutdownGracefully() {
+ return eventLoop.shutdownGracefully();
+ }
+
+ @Override
+ public Future> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
+ return eventLoop.shutdownGracefully(quietPeriod, timeout, unit);
+ }
+
+ @Override
+ public Future> terminationFuture() {
+ return eventLoop.terminationFuture();
+ }
+
+ @Override
+ public void shutdown() {
+ eventLoop.shutdown();
+ }
+
+ @Override
+ public List shutdownNow() {
+ return eventLoop.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return eventLoop.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return eventLoop.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return eventLoop.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public EventExecutor next() {
+ return this;
+ }
+
+ @Override
+ public Iterator iterator() {
+ return eventLoop.iterator();
+ }
+
+ @Override
+ public Future> submit(Runnable task) {
+ return eventLoop.submit(connection.wrapTask(task));
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks) throws InterruptedException {
+ return eventLoop.invokeAll(tasks);
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ return eventLoop.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks) throws InterruptedException, ExecutionException {
+ return eventLoop.invokeAny(tasks);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return eventLoop.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public Future submit(Runnable task, T result) {
+ return eventLoop.submit(connection.wrapTask(task), result);
+ }
+
+ @Override
+ public Future submit(Callable task) {
+ return eventLoop.submit(task);
+ }
+
+ @Override
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ return eventLoop.schedule(command, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
+ return eventLoop.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return eventLoop.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return eventLoop.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public EventExecutorGroup parent() {
+ return eventLoop.parent();
+ }
+
+ @Override
+ public boolean inEventLoop() {
+ return eventLoop.inEventLoop();
+ }
+
+ @Override
+ public boolean inEventLoop(Thread thread) {
+ return eventLoop.inEventLoop(thread);
+ }
+
+ @Override
+ public Promise newPromise() {
+ return eventLoop.newPromise();
+ }
+
+ @Override
+ public ProgressivePromise newProgressivePromise() {
+ return eventLoop.newProgressivePromise();
+ }
+
+ @Override
+ public Future newSucceededFuture(V result) {
+ return eventLoop.newSucceededFuture(result);
+ }
+
+ @Override
+ public Future newFailedFuture(Throwable cause) {
+ return eventLoop.newFailedFuture(cause);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java b/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java
new file mode 100644
index 000000000..e715bb8ee
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java
@@ -0,0 +1,25 @@
+package org.littleshoot.proxy.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.HttpObject;
+
+public class HttpInitialHandler extends ChannelInboundHandlerAdapter {
+
+ private final ProxyConnection proxyConnection;
+
+ HttpInitialHandler(ProxyConnection proxyConnection) {
+ this.proxyConnection = proxyConnection;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ final ConnectionState connectionState = proxyConnection.readHTTPInitial(ctx, msg);
+ proxyConnection.become(connectionState);
+ }
+
+ @Override
+ public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ proxyConnection.exceptionCaught(cause);
+ }
+}
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
index 1f4ee876b..eed893c22 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
@@ -129,11 +129,10 @@ protected void read(Object msg) {
*/
@SuppressWarnings("unchecked")
private void readHTTP(HttpObject httpObject) {
- ConnectionState nextState = getCurrentState();
switch (getCurrentState()) {
case AWAITING_INITIAL:
if (httpObject instanceof HttpMessage) {
- nextState = readHTTPInitial((I) httpObject);
+ ctx.fireChannelRead(httpObject);
} else {
// Similar to the AWAITING_PROXY_AUTHENTICATION case below, we may enter an AWAITING_INITIAL
// state if the proxy responded to an earlier request with a 502 or 504 response, or a short-circuit
@@ -145,13 +144,13 @@ private void readHTTP(HttpObject httpObject) {
case AWAITING_CHUNK:
HttpContent chunk = (HttpContent) httpObject;
readHTTPChunk(chunk);
- nextState = ProxyUtils.isLastChunk(chunk) ? AWAITING_INITIAL
- : AWAITING_CHUNK;
+ become(ProxyUtils.isLastChunk(chunk) ? AWAITING_INITIAL
+ : AWAITING_CHUNK);
break;
case AWAITING_PROXY_AUTHENTICATION:
if (httpObject instanceof HttpRequest) {
// Once we get an HttpRequest, try to process it as usual
- nextState = readHTTPInitial((I) httpObject);
+ ctx.fireChannelRead(httpObject);
} else {
// Anything that's not an HttpRequest that came in while
// we're pending authentication gets dropped on the floor. This
@@ -179,7 +178,6 @@ private void readHTTP(HttpObject httpObject) {
LOG.info("Ignoring message since the connection is closed or about to close");
break;
}
- become(nextState);
}
/**
@@ -189,7 +187,7 @@ private void readHTTP(HttpObject httpObject) {
* @param httpObject
* @return
*/
- protected abstract ConnectionState readHTTPInitial(I httpObject);
+ protected abstract ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpObject);
/**
* Implement this to handle reading a chunk in a chunked transfer.
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index 96e470864..6a1c0cf02 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -8,6 +8,8 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -31,6 +33,7 @@
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCounted;
+import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.littleshoot.proxy.ActivityTracker;
@@ -221,7 +224,8 @@ protected void read(Object msg) {
}
@Override
- protected ConnectionState readHTTPInitial(HttpResponse httpResponse) {
+ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpResponseObj) {
+ HttpResponse httpResponse = (HttpResponse) httpResponseObj;
LOG.debug("Received raw response: {}", httpResponse);
if (httpResponse.getDecoderResult().isFailure()) {
@@ -240,14 +244,47 @@ protected ConnectionState readHTTPInitial(HttpResponse httpResponse) {
currentFilters.serverToProxyResponseReceiving();
rememberCurrentResponse(httpResponse);
- respondWith(httpResponse);
- if (ProxyUtils.isChunked(httpResponse)) {
- return AWAITING_CHUNK;
- } else {
- currentFilters.serverToProxyResponseReceived();
+ ctx.fireChannelRead(httpResponse);
+
+ return getCurrentState();
+ }
- return AWAITING_INITIAL;
+ public class RespondToClientHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ final HttpResponse httpResponse = (HttpResponse) msg;
+ if (ProxyUtils.isChunked(httpResponse)) {
+ respondWith(httpResponse);
+ become(AWAITING_CHUNK);
+ } else {
+ if (httpResponse instanceof ReferenceCounted) {
+ LOG.debug("Retaining reference counted message");
+ ((ReferenceCounted) httpResponse).retain();
+ }
+
+ proxyServer.getMessageProcessingExecutor()
+ .execute(clientConnection.wrapTask(() -> {
+ try {
+ respondWith(httpResponse);
+ currentFilters.serverToProxyResponseReceived();
+ become(AWAITING_INITIAL);
+ } catch (Exception e) {
+ exceptionCaught(ctx, e);
+ } finally {
+ if (httpResponse instanceof ReferenceCounted) {
+ LOG.debug("Retaining reference counted message");
+ ((ReferenceCounted) httpResponse).release();
+ }
+ }
+ }));
+ }
+ }
+
+ @Override
+ public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ serverConnection.exceptionCaught(cause);
}
}
@@ -643,8 +680,8 @@ public Channel newChannel() {
cb.handler(new ChannelInitializer() {
protected void initChannel(Channel ch) throws Exception {
- initChannelPipeline(ch.pipeline(), initialRequest);
- };
+ initChannelPipeline(ch.pipeline(), ch);
+ }
});
cb.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
proxyServer.getConnectTimeout());
@@ -829,6 +866,8 @@ protected boolean connectionFailed(Throwable cause)
private void resetConnectionForRetry() throws UnknownHostException {
// Remove ourselves as handler on the old context
this.ctx.pipeline().remove(this);
+ this.ctx.pipeline().remove("httpInitialHandler");
+ this.ctx.pipeline().remove("respondToClientHandler");
this.ctx.close();
this.ctx = null;
@@ -895,8 +934,7 @@ private void setupConnectionParameters() throws UnknownHostException {
* @param pipeline
* @param httpRequest
*/
- private void initChannelPipeline(ChannelPipeline pipeline,
- HttpRequest httpRequest) {
+ private void initChannelPipeline(ChannelPipeline pipeline, Channel channel) {
if (proxyServer.getGlobalStateHandler() != null) {
pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(clientConnection));
@@ -906,8 +944,10 @@ private void initChannelPipeline(ChannelPipeline pipeline,
pipeline.addLast("global-traffic-shaping", trafficHandler);
}
- pipeline.addLast("bytesReadMonitor", bytesReadMonitor);
- pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor);
+ final EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(clientConnection, channel.eventLoop());
+
+ pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("decoder", new HeadAwareHttpResponseDecoder(
@@ -922,8 +962,8 @@ private void initChannelPipeline(ChannelPipeline pipeline,
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
- pipeline.addLast("responseReadMonitor", responseReadMonitor);
- pipeline.addLast("requestWrittenMonitor", requestWrittenMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "responseReadMonitor", responseReadMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "requestWrittenMonitor", requestWrittenMonitor);
// Set idle timeout
pipeline.addLast(
@@ -935,7 +975,9 @@ private void initChannelPipeline(ChannelPipeline pipeline,
pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(clientConnection));
}
- pipeline.addLast("handler", this);
+ pipeline.addLast(globalStateWrapperEvenLoop, "router", this);
+ pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", new HttpInitialHandler<>(this));
+ pipeline.addLast(globalStateWrapperEvenLoop, "respondToClientHandler", new RespondToClientHandler());
}
/**
diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
index d359c7be1..dbd898305 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
@@ -12,6 +12,8 @@
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -72,6 +74,8 @@ public class ServerGroup {
*/
private final EnumMap protocolThreadPools = new EnumMap(TransportProtocol.class);
+ private final ExecutorService messageProcessingExecutor;
+
/**
* A mapping of selector providers to transport protocols. Avoids special-casing each transport protocol during
* transport protocol initialization.
@@ -104,12 +108,19 @@ public class ServerGroup {
* @param incomingWorkerThreads number of client-to-proxy worker threads per protocol
* @param outgoingWorkerThreads number of proxy-to-server worker threads per protocol
*/
- public ServerGroup(String name, int incomingAcceptorThreads, int incomingWorkerThreads, int outgoingWorkerThreads) {
+ public ServerGroup(String name, int incomingAcceptorThreads,
+ int incomingWorkerThreads, int outgoingWorkerThreads,
+ ExecutorService messageProcessingExecutor) {
this.name = name;
this.serverGroupId = serverGroupCount.getAndIncrement();
this.incomingAcceptorThreads = incomingAcceptorThreads;
this.incomingWorkerThreads = incomingWorkerThreads;
this.outgoingWorkerThreads = outgoingWorkerThreads;
+ if (messageProcessingExecutor == null) {
+ this.messageProcessingExecutor = Executors.newCachedThreadPool();
+ } else {
+ this.messageProcessingExecutor = messageProcessingExecutor;
+ }
}
/**
@@ -219,6 +230,8 @@ private void shutdown(boolean graceful) {
allEventLoopGroups.addAll(threadPools.getAllEventLoops());
}
+ shutdownAndAwaitTermination(messageProcessingExecutor);
+
for (EventLoopGroup group : allEventLoopGroups) {
if (graceful) {
group.shutdownGracefully();
@@ -242,6 +255,24 @@ private void shutdown(boolean graceful) {
log.debug("Done shutting down server group");
}
+ private void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(60, TimeUnit.SECONDS))
+ log.warn("Pool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
/**
* Retrieves the client-to-proxy acceptor thread pool for the specified protocol. Initializes the pool if it has not
* yet been initialized.
@@ -281,6 +312,10 @@ public EventLoopGroup getProxyToServerWorkerPoolForTransport(TransportProtocol p
return getThreadPoolsForProtocol(protocol).getProxyToServerWorkerPool();
}
+ public ExecutorService getMessageProcessingExecutor() {
+ return messageProcessingExecutor;
+ }
+
/**
* @return true if this ServerGroup has already been stopped
*/
diff --git a/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java b/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java
new file mode 100644
index 000000000..612384d95
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java
@@ -0,0 +1,46 @@
+package org.littleshoot.proxy.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.util.ReferenceCountUtil;
+
+public class UpstreamConnectionHandler extends ChannelInboundHandlerAdapter {
+
+ private final ClientToProxyConnection clientToProxyConnection;
+
+ UpstreamConnectionHandler(ClientToProxyConnection clientToProxyConnection) {
+ this.clientToProxyConnection = clientToProxyConnection;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object request) {
+ try {
+ final ConnectionState connectionState =
+ clientToProxyConnection.setupUpstreamConnection(((Request) request).getShortCircuitResponse(),
+ ((Request) request).getInitialRequest());
+ clientToProxyConnection.become(connectionState);
+ } finally {
+ ReferenceCountUtil.release(((Request) request).getInitialRequest());
+ }
+ }
+
+ @Override
+ public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ clientToProxyConnection.exceptionCaught(cause);
+ }
+
+ public static class Request {
+ private final HttpRequest initialRequest;
+ private final HttpResponse shortCircuitResponse;
+
+ public Request(HttpRequest initialRequest, HttpResponse shortCircuitResponse) {
+ this.initialRequest = initialRequest;
+ this.shortCircuitResponse = shortCircuitResponse;
+ }
+
+ HttpRequest getInitialRequest() { return initialRequest; }
+ HttpResponse getShortCircuitResponse() { return shortCircuitResponse; }
+ }
+}
diff --git a/src/test/java/org/littleshoot/proxy/ServerGroupTest.java b/src/test/java/org/littleshoot/proxy/ServerGroupTest.java
index 629a47c2f..e236e4b67 100644
--- a/src/test/java/org/littleshoot/proxy/ServerGroupTest.java
+++ b/src/test/java/org/littleshoot/proxy/ServerGroupTest.java
@@ -4,7 +4,9 @@
import io.netty.handler.codec.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.littleshoot.proxy.impl.DefaultHttpProxyServer;
import org.littleshoot.proxy.impl.ThreadPoolConfiguration;
@@ -12,82 +14,280 @@
import org.mockserver.integration.ClientAndServer;
import org.mockserver.matchers.Times;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
+
+// set up two server responses that will execute more or less simultaneously. the first request has a small
+// delay, to reduce the chance that the first request will finish entirely before the second request is finished
+// (and thus be somewhat more likely to be serviced by the same thread, even if the ThreadPoolConfiguration is
+// not behaving properly).
+
+
+// save the names of the threads that execute the filter methods. filter methods are executed by the worker thread
+// handling the request/response, so if there is only one worker thread, the filter methods should be executed
+// by the same thread.
+
public class ServerGroupTest {
private ClientAndServer mockServer;
private int mockServerPort;
- private HttpProxyServer proxyServer;
+ final String firstRequestPath = "/testSingleThreadFirstRequest";
+ final String secondRequestPath = "/testSingleThreadSecondRequest";
+ final String messageProcessingThreadName = UUID.randomUUID().toString();
+
+ final AtomicReference firstClientThreadName = new AtomicReference();
+ final AtomicReference secondClientThreadName = new AtomicReference();
+
+ final AtomicReference firstProxyThreadName = new AtomicReference();
+ final AtomicReference secondProxyThreadName = new AtomicReference();
@Before
public void setUp() {
mockServer = new ClientAndServer(0);
mockServerPort = mockServer.getPort();
+
+ mockServer.when(request()
+ .withMethod("GET")
+ .withPath(firstRequestPath),
+ Times.exactly(1))
+ .respond(response()
+ .withStatusCode(200)
+ .withBody("first")
+ .withDelay(TimeUnit.MILLISECONDS, 500)
+ );
+
+ mockServer.when(request()
+ .withMethod("GET")
+ .withPath(secondRequestPath),
+ Times.exactly(1))
+ .respond(response()
+ .withStatusCode(200)
+ .withBody("second")
+ );
}
@After
public void tearDown() {
+ firstClientThreadName.set(null);
+ secondClientThreadName.set(null);
+ firstProxyThreadName.set(null);
+ secondProxyThreadName.set(null);
+ if (mockServer != null) {
+ mockServer.stop();
+ }
+ }
+
+ @Test
+ public void testChunkedRequest() throws ExecutionException, InterruptedException {
+
+ final HttpProxyServer proxyServer = getProxy(2, false,
+ false, false, false, false,
+ false, false, false, true);
+
+ final Futures futures = runTwoRequests(proxyServer);
+
+ futures.getFirstFuture().get();
+ futures.getSecondFuture().get();
+
+ assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
+ assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
+
+ assertNotEquals(firstClientThreadName.get(), messageProcessingThreadName);
+ assertNotEquals(firstProxyThreadName.get(), messageProcessingThreadName);
+ }
+
+ @Test
+ public void testBlockFirstRequest() throws ExecutionException, InterruptedException {
+
+ final HttpProxyServer proxyServer = getProxy(2, true,
+ false, false, false, false,
+ false, false, false, false);
+
+ final Futures futures = runTwoRequests(proxyServer);
+
+ try {
+ futures.getSecondFuture().get(2, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fail("Second request took longer than expected");
+ }
+
+ boolean firstStillExecuting = false;
+ try {
+ futures.getFirstFuture().get(2, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ firstStillExecuting = true;
+ }
+
+ Assert.assertTrue("First request must be still executing", firstStillExecuting);
+
try {
- if (mockServer != null) {
- mockServer.stop();
- }
- } finally {
- if (proxyServer != null) {
- proxyServer.abort();
- }
+ futures.getFirstFuture().get(3, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fail("First request took longer than expected");
}
+
+ assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
+ assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
+
+ assertEquals(firstClientThreadName.get(), messageProcessingThreadName);
+ assertEquals(firstProxyThreadName.get(), messageProcessingThreadName);
}
@Test
- public void testSingleWorkerThreadPoolConfiguration() throws ExecutionException, InterruptedException {
- final String firstRequestPath = "/testSingleThreadFirstRequest";
- final String secondRequestPath = "/testSingleThreadSecondRequest";
-
- // set up two server responses that will execute more or less simultaneously. the first request has a small
- // delay, to reduce the chance that the first request will finish entirely before the second request is finished
- // (and thus be somewhat more likely to be serviced by the same thread, even if the ThreadPoolConfiguration is
- // not behaving properly).
- mockServer.when(request()
- .withMethod("GET")
- .withPath(firstRequestPath),
- Times.exactly(1))
- .respond(response()
- .withStatusCode(200)
- .withBody("first")
- .withDelay(TimeUnit.MILLISECONDS, 500)
- );
+ public void testBlockFirstResponse() throws ExecutionException, InterruptedException {
- mockServer.when(request()
- .withMethod("GET")
- .withPath(secondRequestPath),
- Times.exactly(1))
- .respond(response()
- .withStatusCode(200)
- .withBody("second")
- );
-
- // save the names of the threads that execute the filter methods. filter methods are executed by the worker thread
- // handling the request/response, so if there is only one worker thread, the filter methods should be executed
- // by the same thread.
- final AtomicReference firstClientThreadName = new AtomicReference();
- final AtomicReference secondClientThreadName = new AtomicReference();
-
- final AtomicReference firstProxyThreadName = new AtomicReference();
- final AtomicReference secondProxyThreadName = new AtomicReference();
-
- proxyServer = DefaultHttpProxyServer.bootstrap()
+ final HttpProxyServer proxyServer = getProxy(2, false,
+ false, true, false, false,
+ false, false, false, false);
+
+ final Futures futures = runTwoRequests(proxyServer);
+
+ try {
+ futures.getSecondFuture().get(2, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fail("Second request took longer than expected");
+ }
+
+ boolean firstStillExecuting = false;
+ try {
+ futures.getFirstFuture().get(2, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ firstStillExecuting = true;
+ }
+
+ Assert.assertTrue("First request must be still executing", firstStillExecuting);
+
+ try {
+ futures.getFirstFuture().get(3, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fail("First request took longer than expected");
+ }
+
+ assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
+ assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
+
+ assertEquals(firstClientThreadName.get(), messageProcessingThreadName);
+ assertEquals(firstProxyThreadName.get(), messageProcessingThreadName);
+ }
+
+ @Test(expected = ExecutionException.class)
+ public void testExceptionFirstRequest() throws ExecutionException, InterruptedException {
+
+ final HttpProxyServer proxyServer = getProxy(2, false,
+ false, false, false, true,
+ false, false, false, false);
+
+ final Futures futures = runTwoRequests(proxyServer);
+
+ futures.getFirstFuture().get();
+ futures.getSecondFuture().get();
+ }
+
+ @Test(expected = ExecutionException.class)
+ @Ignore // for some reason the test hangs even with original logic
+ public void testExceptionFirstResponse() throws ExecutionException, InterruptedException {
+
+ final HttpProxyServer proxyServer = getProxy(2, false,
+ false, false, false, false,
+ false, true, false, false);
+
+ final Futures futures = runTwoRequests(proxyServer);
+
+ futures.getFirstFuture().get();
+ futures.getSecondFuture().get();
+ }
+
+ @Test
+ public void testBlockFirstRequestSingleProcessingThread() throws ExecutionException, InterruptedException {
+
+ final HttpProxyServer proxyServer = getProxy(1, true,
+ false, false, false, false,
+ false, false, false, false);
+
+ final Futures futures = runTwoRequests(proxyServer);
+
+ boolean secondStillExecuting = false;
+ try {
+ futures.getSecondFuture().get(2, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ secondStillExecuting = true;
+ }
+
+ Assert.assertTrue("Second request must be still executing", secondStillExecuting);
+
+ boolean firstStillExecuting = false;
+ try {
+ futures.getFirstFuture().get(2, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ firstStillExecuting = true;
+ }
+
+ Assert.assertTrue("First request must be still executing", firstStillExecuting);
+
+ try {
+ futures.getFirstFuture().get(3, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fail("First request took longer than expected");
+ }
+
+ try {
+ futures.getSecondFuture().get(3, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ fail("Second request took longer than expected");
+ }
+
+ assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
+ assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
+
+ assertEquals(firstClientThreadName.get(), messageProcessingThreadName);
+ assertEquals(firstProxyThreadName.get(), messageProcessingThreadName);
+ }
+
+ private HttpProxyServer getProxy(int processingThreads,
+ boolean blockFirstRequest,
+ boolean blockSecondRequest,
+ boolean blockFirstResponse,
+ boolean blockSecondResponse,
+ boolean throwFirstRequestException,
+ boolean throwSecondRequestException,
+ boolean throwFirstResponseException,
+ boolean throwSecondResponseException,
+ boolean withChunks) {
+ return DefaultHttpProxyServer.bootstrap()
.withPort(0)
.withFiltersSource(new HttpFiltersSourceAdapter() {
+
+ // required so chinks for used
+ @Override
+ public int getMaximumRequestBufferSizeInBytes() {
+ if (withChunks) {
+ return 0;
+ }
+ return 8388608 * 2;
+ }
+
+ @Override
+ public int getMaximumResponseBufferSizeInBytes() {
+ if (withChunks) {
+ return 0;
+ }
+ return 8388608 * 2;
+ }
+
+
@Override
public HttpFilters filterRequest(HttpRequest originalRequest) {
return new HttpFiltersAdapter(originalRequest) {
@@ -95,20 +295,55 @@ public HttpFilters filterRequest(HttpRequest originalRequest) {
public io.netty.handler.codec.http.HttpResponse clientToProxyRequest(HttpObject httpObject) {
if (originalRequest.getUri().endsWith(firstRequestPath)) {
firstClientThreadName.set(Thread.currentThread().getName());
+
+ if (throwFirstRequestException) {
+ throw new RuntimeException("first-request");
+ }
+
+ if (blockFirstRequest) {
+ block();
+ }
+
} else if (originalRequest.getUri().endsWith(secondRequestPath)) {
secondClientThreadName.set(Thread.currentThread().getName());
+
+ if (throwSecondRequestException) {
+ throw new RuntimeException("second-request");
+ }
+
+ if (blockSecondRequest) {
+ block();
+ }
}
return super.clientToProxyRequest(httpObject);
}
@Override
- public void serverToProxyResponseReceived() {
+ public HttpObject serverToProxyResponse(HttpObject httpObject) {
if (originalRequest.getUri().endsWith(firstRequestPath)) {
firstProxyThreadName.set(Thread.currentThread().getName());
+
+ if (throwFirstResponseException) {
+ throw new RuntimeException("first-response");
+ }
+
+ if (blockFirstResponse) {
+ block();
+ }
+
} else if (originalRequest.getUri().endsWith(secondRequestPath)) {
secondProxyThreadName.set(Thread.currentThread().getName());
+
+ if (throwSecondResponseException) {
+ throw new RuntimeException("second-response");
+ }
+
+ if (blockSecondResponse) {
+ block();
+ }
}
+ return httpObject;
}
};
}
@@ -117,37 +352,60 @@ public void serverToProxyResponseReceived() {
.withAcceptorThreads(1)
.withClientToProxyWorkerThreads(1)
.withProxyToServerWorkerThreads(1))
+ .withMessageProcessingExecutor(Executors.newFixedThreadPool(processingThreads, r -> {
+ final Thread thread = new Thread(r);
+ thread.setName(messageProcessingThreadName);
+ return thread;
+ }))
.start();
+ }
+ private void block() {
+ try {
+ Thread.sleep(4000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private Futures runTwoRequests(HttpProxyServer proxyServer) throws InterruptedException {
// execute both requests in parallel, to increase the chance of blocking due to the single-threaded ThreadPoolConfiguration
- Runnable firstRequest = new Runnable() {
- @Override
- public void run() {
- HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + firstRequestPath, proxyServer);
- assertEquals(200, response.getStatusLine().getStatusCode());
- }
+ final Runnable firstRequest = () -> {
+ HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + firstRequestPath, proxyServer);
+ assertEquals(200, response.getStatusLine().getStatusCode());
};
- Runnable secondRequest = new Runnable () {
- @Override
- public void run() {
- HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + secondRequestPath, proxyServer);
- assertEquals(200, response.getStatusLine().getStatusCode());
- }
+ final Runnable secondRequest = () -> {
+ HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + secondRequestPath, proxyServer);
+ assertEquals(200, response.getStatusLine().getStatusCode());
};
ExecutorService executor = Executors.newFixedThreadPool(2);
Future> firstFuture = executor.submit(firstRequest);
+ Thread.sleep(500);
Future> secondFuture = executor.submit(secondRequest);
- firstFuture.get();
- secondFuture.get();
+ return new Futures(firstFuture, secondFuture);
+ }
- Thread.sleep(500);
+ private static class Futures {
+ Future> getFirstFuture() {
+ return firstFuture;
+ }
- assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
- assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
+ Future> getSecondFuture() {
+ return secondFuture;
+ }
+
+ final Future> firstFuture;
+ final Future> secondFuture;
+
+ private Futures(Future> firstFuture, Future> secondFuture) {
+ this.firstFuture = firstFuture;
+ this.secondFuture = secondFuture;
+ }
}
+
}