Date: Sat, 5 Jan 2019 00:02:54 +0200
Subject: [PATCH 17/43] remove hardcoded processing executor
---
.../proxy/HttpProxyServerBootstrap.java | 9 ++++++++
.../proxy/impl/ClientToProxyConnection.java | 8 ++-----
.../proxy/impl/DefaultHttpProxyServer.java | 21 +++++++++++++++++--
.../proxy/impl/ProxyToServerConnection.java | 7 ++-----
.../littleshoot/proxy/impl/ServerGroup.java | 21 ++++++++++++++++++-
5 files changed, 52 insertions(+), 14 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
index cab76a06a..3bbc6f910 100644
--- a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
+++ b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
@@ -4,6 +4,7 @@
import org.littleshoot.proxy.ratelimit.RateLimiter;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
/**
* Configures and starts an {@link HttpProxyServer}. The HttpProxyServer is
@@ -242,6 +243,14 @@ HttpProxyServerBootstrap withProxyToServerExHandler(
HttpProxyServerBootstrap withCustomGlobalState(
GlobalStateHandler globalStateHandler);
+ /**
+ *
+ * @param payloadProcessorExecutor
+ * @return
+ */
+ HttpProxyServerBootstrap withPyaloadProcessorExecutor(
+ ExecutorService payloadProcessorExecutor);
+
/**
*
* Specify a {@link HttpFiltersSource} to use for filtering requests and/or
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index 13372cf80..f592a754a 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -28,7 +28,6 @@
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.ReferenceCounted;
-import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.commons.lang3.StringUtils;
@@ -54,8 +53,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -354,8 +351,6 @@ public ConnectionState doReadHTTPInitial(HttpRequest httpRequest) {
}
}
- private static final Executor executor = Executors.newCachedThreadPool();
-
@Sharable
protected class ClientToProxyProcessor extends ChannelDuplexHandler {
@@ -372,7 +367,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (ProxyUtils.isChunked(httpRequest)) {
process(ctx, msg, httpRequest);
} else {
- executor.execute(wrapTask(() -> process(ctx, msg, httpRequest)));
+ proxyServer.getPayloadProcessorExecutor()
+ .execute(wrapTask(() -> process(ctx, msg, httpRequest)));
}
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
index 6a55f508a..753e4d99e 100644
--- a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
+++ b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
@@ -51,6 +51,7 @@
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -119,6 +120,7 @@ public class DefaultHttpProxyServer implements HttpProxyServer {
private final ExceptionHandler proxyToServerExHandler;
private final RequestTracer requestTracer;
private final GlobalStateHandler globalStateHandler;
+ private final ExecutorService payloadProcessorExecutor;
private final HttpFiltersSource filtersSource;
private final FailureHttpResponseComposer unrecoverableFailureHttpResponseComposer;
private final boolean transparent;
@@ -257,6 +259,7 @@ private DefaultHttpProxyServer(ServerGroup serverGroup,
ExceptionHandler proxyToServerExHandler,
RequestTracer requestTracer,
GlobalStateHandler globalStateHandler,
+ ExecutorService payloadProcessorExecutor,
HttpFiltersSource filtersSource,
FailureHttpResponseComposer unrecoverableFailureHttpResponseComposer,
boolean transparent,
@@ -285,6 +288,7 @@ private DefaultHttpProxyServer(ServerGroup serverGroup,
this.proxyToServerExHandler = proxyToServerExHandler;
this.requestTracer = requestTracer;
this.globalStateHandler = globalStateHandler;
+ this.payloadProcessorExecutor = payloadProcessorExecutor;
this.filtersSource = filtersSource;
this.unrecoverableFailureHttpResponseComposer = unrecoverableFailureHttpResponseComposer;
this.transparent = transparent;
@@ -621,6 +625,10 @@ protected GlobalStateHandler getGlobalStateHandler() {
return globalStateHandler;
}
+ protected ExecutorService getPayloadProcessorExecutor() {
+ return payloadProcessorExecutor;
+ }
+
protected RequestTracer getRequestTracer() {
return requestTracer;
}
@@ -671,6 +679,7 @@ private static class DefaultHttpProxyServerBootstrap implements HttpProxyServerB
private ExceptionHandler proxyToServerExHandler = null;
private RequestTracer requestTracer = null;
private GlobalStateHandler globalStateHandler = null;
+ private ExecutorService payloadProcessorExecutor = null;
private HttpFiltersSource filtersSource = new HttpFiltersSourceAdapter();
private FailureHttpResponseComposer unrecoverableFailureHttpResponseComposer = new DefaultFailureHttpResponseComposer();
private boolean transparent = false;
@@ -896,6 +905,13 @@ public HttpProxyServerBootstrap withCustomGlobalState(
return this;
}
+ @Override
+ public HttpProxyServerBootstrap withPyaloadProcessorExecutor(
+ ExecutorService payloadProcessorExecutor) {
+ this.payloadProcessorExecutor = payloadProcessorExecutor;
+ return this;
+ }
+
@Override
public HttpProxyServerBootstrap withFiltersSource(
HttpFiltersSource filtersSource) {
@@ -1011,14 +1027,15 @@ private DefaultHttpProxyServer build() {
serverGroup = this.serverGroup;
}
else {
- serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads, clientToProxyWorkerThreads, proxyToServerWorkerThreads);
+ serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads,
+ clientToProxyWorkerThreads, proxyToServerWorkerThreads, payloadProcessorExecutor);
}
return new DefaultHttpProxyServer(serverGroup,
transportProtocol, determineListenAddress(),
sslEngineSource, authenticateSslClients,
proxyAuthenticator, chainProxyManager, mitmManagerFactory,
- clientToProxyExHandler, proxyToServerExHandler, requestTracer, globalStateHandler,
+ clientToProxyExHandler, proxyToServerExHandler, requestTracer, globalStateHandler, payloadProcessorExecutor,
filtersSource, unrecoverableFailureHttpResponseComposer, transparent,
idleConnectionTimeout, activityTrackers, connectTimeout,
serverResolver, readThrottleBytesPerSecond, writeThrottleBytesPerSecond,
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index c84395011..63b5677bd 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -54,8 +54,6 @@
import java.net.UnknownHostException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import static org.littleshoot.proxy.impl.ConnectionState.AWAITING_CHUNK;
@@ -256,7 +254,8 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, HttpRespons
((ReferenceCounted) resp).retain();
}
- executor.execute(clientConnection.wrapTask(() -> {
+ proxyServer.getPayloadProcessorExecutor()
+ .execute(clientConnection.wrapTask(() -> {
currentFilters.serverToProxyResponseReceived();
respondWith(resp);
}));
@@ -264,8 +263,6 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, HttpRespons
}
}
- Executor executor = Executors.newCachedThreadPool();
-
@Override
protected void readHTTPChunk(HttpContent chunk) {
respondWith(chunk);
diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
index d359c7be1..90ad3a3e9 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
@@ -12,6 +12,8 @@
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -72,6 +74,8 @@ public class ServerGroup {
*/
private final EnumMap protocolThreadPools = new EnumMap(TransportProtocol.class);
+ private final ExecutorService payloadProcessingExecutor;
+
/**
* A mapping of selector providers to transport protocols. Avoids special-casing each transport protocol during
* transport protocol initialization.
@@ -104,12 +108,19 @@ public class ServerGroup {
* @param incomingWorkerThreads number of client-to-proxy worker threads per protocol
* @param outgoingWorkerThreads number of proxy-to-server worker threads per protocol
*/
- public ServerGroup(String name, int incomingAcceptorThreads, int incomingWorkerThreads, int outgoingWorkerThreads) {
+ public ServerGroup(String name, int incomingAcceptorThreads,
+ int incomingWorkerThreads, int outgoingWorkerThreads,
+ ExecutorService payloadProcessingExecutor) {
this.name = name;
this.serverGroupId = serverGroupCount.getAndIncrement();
this.incomingAcceptorThreads = incomingAcceptorThreads;
this.incomingWorkerThreads = incomingWorkerThreads;
this.outgoingWorkerThreads = outgoingWorkerThreads;
+ if (payloadProcessingExecutor == null) {
+ this.payloadProcessingExecutor = Executors.newFixedThreadPool(incomingWorkerThreads);
+ } else {
+ this.payloadProcessingExecutor = payloadProcessingExecutor;
+ }
}
/**
@@ -219,6 +230,14 @@ private void shutdown(boolean graceful) {
allEventLoopGroups.addAll(threadPools.getAllEventLoops());
}
+ payloadProcessingExecutor.shutdown();
+
+ try {
+ payloadProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Failed to shutdown payload processing executor properly", e);
+ }
+
for (EventLoopGroup group : allEventLoopGroups) {
if (graceful) {
group.shutdownGracefully();
From 087d4ae74af6c9c4bf248f10f235754e7a725048 Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Sat, 5 Jan 2019 00:20:57 +0200
Subject: [PATCH 18/43] build fix
---
.../proxy/impl/ClientToProxyConnection.java | 4 +--
.../proxy/impl/DefaultHttpProxyServer.java | 2 +-
.../proxy/impl/ProxyConnection.java | 30 ++++++++++---------
.../littleshoot/proxy/impl/ServerGroup.java | 4 +++
4 files changed, 23 insertions(+), 17 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index f592a754a..19bc26337 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -352,7 +352,7 @@ public ConnectionState doReadHTTPInitial(HttpRequest httpRequest) {
}
@Sharable
- protected class ClientToProxyProcessor extends ChannelDuplexHandler {
+ protected class ClientPayloadProcessor extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
@@ -922,7 +922,7 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast(globalStateWrapperEvenLoop, "handlerBegin", this);
- pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyProcessor", new ClientToProxyProcessor());
+ pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyProcessor", new ClientPayloadProcessor());
pipeline.addLast(globalStateWrapperEvenLoop, "handlerEnd", this);
diff --git a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
index 753e4d99e..9f212c9fa 100644
--- a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
+++ b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
@@ -626,7 +626,7 @@ protected GlobalStateHandler getGlobalStateHandler() {
}
protected ExecutorService getPayloadProcessorExecutor() {
- return payloadProcessorExecutor;
+ return serverGroup.getPayloadProcessingExecutor();
}
protected RequestTracer getRequestTracer() {
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
index d31fb48a0..d92f1a473 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
@@ -133,11 +133,7 @@ private void readHTTP(ChannelHandlerContext ctx, HttpObject httpObject) {
switch (getCurrentState()) {
case AWAITING_INITIAL:
if (httpObject instanceof HttpMessage) {
- if (ctx.name().equals("handlerEnd")) {
- nextState = ((ClientToProxyConnection)this).doReadHTTPInitial((HttpRequest) httpObject);
- } else {
- nextState = readHTTPInitial(ctx, (I) httpObject);
- }
+ nextState = processPayload(ctx, (I) httpObject);
} else {
// Similar to the AWAITING_PROXY_AUTHENTICATION case below, we may enter an AWAITING_INITIAL
// state if the proxy responded to an earlier request with a 502 or 504 response, or a short-circuit
@@ -155,11 +151,7 @@ private void readHTTP(ChannelHandlerContext ctx, HttpObject httpObject) {
case AWAITING_PROXY_AUTHENTICATION:
if (httpObject instanceof HttpRequest) {
// Once we get an HttpRequest, try to process it as usual
- if (ctx.name().equals("handlerEnd")) {
- nextState = ((ClientToProxyConnection)this).doReadHTTPInitial((HttpRequest) httpObject);
- } else {
- nextState = readHTTPInitial(ctx, (I) httpObject);
- }
+ nextState = processPayload(ctx, (I) httpObject);
} else {
// Anything that's not an HttpRequest that came in while
// we're pending authentication gets dropped on the floor. This
@@ -190,6 +182,20 @@ private void readHTTP(ChannelHandlerContext ctx, HttpObject httpObject) {
become(nextState);
}
+ private ConnectionState processPayload(ChannelHandlerContext ctx, I httpObject) {
+ ConnectionState nextState;
+ if (afterPayloadProcessor(ctx)) {
+ nextState = ((ClientToProxyConnection) this).doReadHTTPInitial((HttpRequest) httpObject);
+ } else {
+ nextState = readHTTPInitial(ctx, httpObject);
+ }
+ return nextState;
+ }
+
+ private boolean afterPayloadProcessor(ChannelHandlerContext ctx) {
+ return ctx.name().equals("handlerEnd");
+ }
+
/**
* Implement this to handle reading the initial object (e.g.
* {@link HttpRequest} or {@link HttpResponse}).
@@ -541,10 +547,6 @@ protected void become(ConnectionState state) {
this.currentState = state;
}
- protected ConnectionState state() {
- return this.currentState;
- }
-
protected ConnectionState getCurrentState() {
return currentState;
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
index 90ad3a3e9..929c199f1 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
@@ -300,6 +300,10 @@ public EventLoopGroup getProxyToServerWorkerPoolForTransport(TransportProtocol p
return getThreadPoolsForProtocol(protocol).getProxyToServerWorkerPool();
}
+ public ExecutorService getPayloadProcessingExecutor() {
+ return payloadProcessingExecutor;
+ }
+
/**
* @return true if this ServerGroup has already been stopped
*/
From cddbc1e5feea0ae34c15beb7c479a752c4068727 Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Mon, 7 Jan 2019 00:29:10 +0200
Subject: [PATCH 19/43] Cached default executor, correct statee
---
.../proxy/impl/ClientToProxyConnection.java | 15 ++++-----------
.../proxy/impl/ProxyToServerConnection.java | 7 ++++---
.../org/littleshoot/proxy/impl/ServerGroup.java | 2 +-
3 files changed, 9 insertions(+), 15 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index 19bc26337..17104b126 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -3,11 +3,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
@@ -145,7 +144,7 @@ public class ClientToProxyConnection extends ProxyConnection {
private AtomicBoolean authenticated = new AtomicBoolean();
- public HttpResponse clientToProxyResponse;
+ private HttpResponse clientToProxyResponse;
private final GlobalTrafficShapingHandler globalTrafficShapingHandler;
@@ -352,10 +351,10 @@ public ConnectionState doReadHTTPInitial(HttpRequest httpRequest) {
}
@Sharable
- protected class ClientPayloadProcessor extends ChannelDuplexHandler {
+ protected class ClientPayloadProcessor extends ChannelInboundHandlerAdapter {
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
@@ -413,12 +412,6 @@ private void process(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequ
ctx.fireChannelRead(httpRequest);
}
}
-
- @Override
- public void write(ChannelHandlerContext ctx,
- Object msg, ChannelPromise promise) throws Exception {
- super.write(ctx, msg, promise);
- }
}
public class GlobalStateWrapperEvenLoop extends DefaultEventLoop {
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index 63b5677bd..e41907f7b 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -256,10 +256,11 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, HttpRespons
proxyServer.getPayloadProcessorExecutor()
.execute(clientConnection.wrapTask(() -> {
- currentFilters.serverToProxyResponseReceived();
respondWith(resp);
- }));
- return AWAITING_INITIAL;
+ currentFilters.serverToProxyResponseReceived();
+ super.become(AWAITING_INITIAL);
+ }));
+ return getCurrentState();
}
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
index 929c199f1..144b0f75e 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
@@ -117,7 +117,7 @@ public ServerGroup(String name, int incomingAcceptorThreads,
this.incomingWorkerThreads = incomingWorkerThreads;
this.outgoingWorkerThreads = outgoingWorkerThreads;
if (payloadProcessingExecutor == null) {
- this.payloadProcessingExecutor = Executors.newFixedThreadPool(incomingWorkerThreads);
+ this.payloadProcessingExecutor = Executors.newCachedThreadPool();
} else {
this.payloadProcessingExecutor = payloadProcessingExecutor;
}
From 35346f92ea6ae3192a84ebbb17e362d045dc80b8 Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Mon, 7 Jan 2019 00:49:20 +0200
Subject: [PATCH 20/43] Test build 1
---
.../org/littleshoot/proxy/impl/ServerGroup.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
index 144b0f75e..8dba1af42 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
@@ -230,13 +230,13 @@ private void shutdown(boolean graceful) {
allEventLoopGroups.addAll(threadPools.getAllEventLoops());
}
- payloadProcessingExecutor.shutdown();
-
- try {
- payloadProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- log.warn("Failed to shutdown payload processing executor properly", e);
- }
+// payloadProcessingExecutor.shutdown();
+//
+// try {
+// payloadProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS);
+// } catch (InterruptedException e) {
+// log.warn("Failed to shutdown payload processing executor properly", e);
+// }
for (EventLoopGroup group : allEventLoopGroups) {
if (graceful) {
From beee58a5b50b6518b99abfbbe637dd1e627e52ab Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Mon, 7 Jan 2019 09:49:40 +0200
Subject: [PATCH 21/43] Try fixing build
---
.../proxy/impl/ProxyToServerConnection.java | 11 ++++++-----
.../org/littleshoot/proxy/impl/ServerGroup.java | 14 +++++++-------
2 files changed, 13 insertions(+), 12 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index e41907f7b..c3d94722b 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -254,13 +254,14 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, HttpRespons
((ReferenceCounted) resp).retain();
}
- proxyServer.getPayloadProcessorExecutor()
- .execute(clientConnection.wrapTask(() -> {
+// proxyServer.getPayloadProcessorExecutor()
+// .execute(clientConnection.wrapTask(() -> {
respondWith(resp);
currentFilters.serverToProxyResponseReceived();
- super.become(AWAITING_INITIAL);
- }));
- return getCurrentState();
+// super.become(AWAITING_INITIAL);
+// }));
+// return getCurrentState();
+ return AWAITING_INITIAL;
}
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
index 8dba1af42..144b0f75e 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
@@ -230,13 +230,13 @@ private void shutdown(boolean graceful) {
allEventLoopGroups.addAll(threadPools.getAllEventLoops());
}
-// payloadProcessingExecutor.shutdown();
-//
-// try {
-// payloadProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS);
-// } catch (InterruptedException e) {
-// log.warn("Failed to shutdown payload processing executor properly", e);
-// }
+ payloadProcessingExecutor.shutdown();
+
+ try {
+ payloadProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Failed to shutdown payload processing executor properly", e);
+ }
for (EventLoopGroup group : allEventLoopGroups) {
if (graceful) {
From 44a32786419fbfe0fa960890b50902c2d3563a78 Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Mon, 7 Jan 2019 10:26:12 +0200
Subject: [PATCH 22/43] Try fixing build 3
---
.../proxy/impl/ClientToProxyConnection.java | 14 +++++++-------
.../proxy/impl/ProxyToServerConnection.java | 10 +++++-----
2 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index 17104b126..88dcdfdbe 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -887,8 +887,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
EventLoopGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(channel.eventLoop());
- pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
- pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
+ pipeline.addLast( "bytesReadMonitor", bytesReadMonitor);
+ pipeline.addLast( "bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("encoder", new HttpResponseEncoder());
// We want to allow longer request lines, headers, and chunks
@@ -905,19 +905,19 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
- pipeline.addLast(globalStateWrapperEvenLoop, "requestReadMonitor", requestReadMonitor);
- pipeline.addLast(globalStateWrapperEvenLoop, "responseWrittenMonitor", responseWrittenMonitor);
+ pipeline.addLast( "requestReadMonitor", requestReadMonitor);
+ pipeline.addLast( "responseWrittenMonitor", responseWrittenMonitor);
pipeline.addLast(
"idle",
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
- pipeline.addLast(globalStateWrapperEvenLoop, "handlerBegin", this);
+ pipeline.addLast( "handlerBegin", this);
- pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyProcessor", new ClientPayloadProcessor());
+ pipeline.addLast( "clientToProxyProcessor", new ClientPayloadProcessor());
- pipeline.addLast(globalStateWrapperEvenLoop, "handlerEnd", this);
+ pipeline.addLast( "handlerEnd", this);
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index c3d94722b..31e794c78 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -917,8 +917,8 @@ private void initChannelPipeline(ChannelPipeline pipeline,
EventLoopGroup globalStateWrapperEvenLoop = clientConnection.new GlobalStateWrapperEvenLoop(channel.eventLoop());
- pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
- pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
+ pipeline.addLast( "bytesReadMonitor", bytesReadMonitor);
+ pipeline.addLast( "bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("decoder", new HeadAwareHttpResponseDecoder(
@@ -933,8 +933,8 @@ private void initChannelPipeline(ChannelPipeline pipeline,
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
- pipeline.addLast(globalStateWrapperEvenLoop, "responseReadMonitor", responseReadMonitor);
- pipeline.addLast(globalStateWrapperEvenLoop, "requestWrittenMonitor", requestWrittenMonitor);
+ pipeline.addLast( "responseReadMonitor", responseReadMonitor);
+ pipeline.addLast( "requestWrittenMonitor", requestWrittenMonitor);
// Set idle timeout
pipeline.addLast(
@@ -942,7 +942,7 @@ private void initChannelPipeline(ChannelPipeline pipeline,
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
- pipeline.addLast(globalStateWrapperEvenLoop, "handler", this);
+ pipeline.addLast( "handler", this);
}
/**
From 006935eecec63371ed693f13c8db89bf04f6bd71 Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Mon, 7 Jan 2019 10:31:45 +0200
Subject: [PATCH 23/43] Uncomment async response
---
.../proxy/impl/ProxyToServerConnection.java | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index 31e794c78..97622a3d5 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -254,14 +254,13 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, HttpRespons
((ReferenceCounted) resp).retain();
}
-// proxyServer.getPayloadProcessorExecutor()
-// .execute(clientConnection.wrapTask(() -> {
+ proxyServer.getPayloadProcessorExecutor()
+ .execute(clientConnection.wrapTask(() -> {
respondWith(resp);
currentFilters.serverToProxyResponseReceived();
-// super.become(AWAITING_INITIAL);
-// }));
-// return getCurrentState();
- return AWAITING_INITIAL;
+ super.become(AWAITING_INITIAL);
+ }));
+ return getCurrentState();
}
}
From bfe4ee1be81fd781c49a0706f81a48d53306ca0d Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Mon, 7 Jan 2019 10:46:01 +0200
Subject: [PATCH 24/43] Try adding global state wrapper
---
.../littleshoot/proxy/impl/ClientToProxyConnection.java | 8 ++++----
.../littleshoot/proxy/impl/ProxyToServerConnection.java | 8 ++++----
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index 88dcdfdbe..75868aaae 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -887,8 +887,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
EventLoopGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(channel.eventLoop());
- pipeline.addLast( "bytesReadMonitor", bytesReadMonitor);
- pipeline.addLast( "bytesWrittenMonitor", bytesWrittenMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("encoder", new HttpResponseEncoder());
// We want to allow longer request lines, headers, and chunks
@@ -905,8 +905,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
- pipeline.addLast( "requestReadMonitor", requestReadMonitor);
- pipeline.addLast( "responseWrittenMonitor", responseWrittenMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "requestReadMonitor", requestReadMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "responseWrittenMonitor", responseWrittenMonitor);
pipeline.addLast(
"idle",
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index 97622a3d5..959ad2694 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -916,8 +916,8 @@ private void initChannelPipeline(ChannelPipeline pipeline,
EventLoopGroup globalStateWrapperEvenLoop = clientConnection.new GlobalStateWrapperEvenLoop(channel.eventLoop());
- pipeline.addLast( "bytesReadMonitor", bytesReadMonitor);
- pipeline.addLast( "bytesWrittenMonitor", bytesWrittenMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("decoder", new HeadAwareHttpResponseDecoder(
@@ -932,8 +932,8 @@ private void initChannelPipeline(ChannelPipeline pipeline,
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
- pipeline.addLast( "responseReadMonitor", responseReadMonitor);
- pipeline.addLast( "requestWrittenMonitor", requestWrittenMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "responseReadMonitor", responseReadMonitor);
+ pipeline.addLast(globalStateWrapperEvenLoop, "requestWrittenMonitor", requestWrittenMonitor);
// Set idle timeout
pipeline.addLast(
From 7cf431318dd08331b61f7e062fef1e1734c23d0b Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Mon, 7 Jan 2019 14:34:33 +0200
Subject: [PATCH 25/43] Improve global state wrapper
---
.../proxy/impl/ClientToProxyConnection.java | 46 ++---
.../impl/GlobalStateWrapperEvenLoop.java | 187 ++++++++++++++++++
.../proxy/impl/ProxyToServerConnection.java | 15 +-
3 files changed, 214 insertions(+), 34 deletions(-)
create mode 100644 src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index 75868aaae..603b15293 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -7,9 +7,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.DefaultEventLoop;
-import io.netty.channel.EventLoop;
-import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
@@ -27,8 +24,10 @@
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.ReferenceCounted;
+import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
+
import org.apache.commons.lang3.StringUtils;
import org.littleshoot.proxy.ActivityTracker;
import org.littleshoot.proxy.DefaultFailureHttpResponseComposer;
@@ -414,30 +413,7 @@ private void process(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequ
}
}
- public class GlobalStateWrapperEvenLoop extends DefaultEventLoop {
-
- private final EventLoop eventLoop;
-
- GlobalStateWrapperEvenLoop(EventLoop eventLoop) {
- this.eventLoop = eventLoop;
- }
-
- @Override
- public void execute(Runnable task) {
- if (eventLoop.inEventLoop()) {
- wrapTask(task).run();
- } else {
- eventLoop.execute(wrapTask(task));
- }
- }
-
- @Override
- public boolean inEventLoop() {
- return false;
- }
- }
-
- Runnable wrapTask(Runnable task) {
+ public Runnable wrapTask(Runnable task) {
return () -> {
if (proxyServer.getGlobalStateHandler() != null) {
try {
@@ -885,7 +861,11 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("requestTracerHandler", new RequestTracerHandler(this));
}
- EventLoopGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(channel.eventLoop());
+// if (proxyServer.getGlobalStateHandler() != null) {
+// pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(this));
+// }
+
+ EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this);
pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
@@ -913,11 +893,15 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
- pipeline.addLast( "handlerBegin", this);
+// if (proxyServer.getGlobalStateHandler() != null) {
+// pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(this));
+// }
+
+ pipeline.addLast(globalStateWrapperEvenLoop, "handlerBegin", this);
- pipeline.addLast( "clientToProxyProcessor", new ClientPayloadProcessor());
+ pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyProcessor", new ClientPayloadProcessor());
- pipeline.addLast( "handlerEnd", this);
+ pipeline.addLast(globalStateWrapperEvenLoop, "handlerEnd", this);
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java b/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
new file mode 100644
index 000000000..211f00d4e
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
@@ -0,0 +1,187 @@
+package org.littleshoot.proxy.impl;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ProgressivePromise;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+
+public class GlobalStateWrapperEvenLoop implements EventExecutor {
+
+ private final ClientToProxyConnection connection;
+
+ private final EventExecutor eventLoop;
+
+ GlobalStateWrapperEvenLoop(ClientToProxyConnection connection) {
+ this.connection = connection;
+ this.eventLoop = connection.channel.eventLoop();
+ }
+
+ GlobalStateWrapperEvenLoop(ClientToProxyConnection connection, EventExecutor eventLoop) {
+ this.connection = connection;
+ this.eventLoop = eventLoop;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ if (eventLoop.inEventLoop()) {
+ connection.wrapTask(command).run();
+ } else {
+ eventLoop.execute(connection.wrapTask(command));
+ }
+ }
+
+ @Override
+ public boolean isShuttingDown() {
+ return eventLoop.isShuttingDown();
+ }
+
+ @Override
+ public Future> shutdownGracefully() {
+ return eventLoop.shutdownGracefully();
+ }
+
+ @Override
+ public Future> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
+ return eventLoop.shutdownGracefully(quietPeriod, timeout, unit);
+ }
+
+ @Override
+ public Future> terminationFuture() {
+ return eventLoop.terminationFuture();
+ }
+
+ @Override
+ public void shutdown() {
+ eventLoop.terminationFuture();
+ }
+
+ @Override
+ public List shutdownNow() {
+ return eventLoop.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return eventLoop.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return eventLoop.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return eventLoop.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public EventExecutor next() {
+ return this;
+ }
+
+ @Override
+ public Iterator iterator() {
+ return eventLoop.iterator();
+ }
+
+ @Override
+ public Future> submit(Runnable task) {
+ return eventLoop.submit(connection.wrapTask(task));
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks) throws InterruptedException {
+ return eventLoop.invokeAll(tasks);
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ return eventLoop.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks) throws InterruptedException, ExecutionException {
+ return eventLoop.invokeAny(tasks);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return eventLoop.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public Future submit(Runnable task, T result) {
+ return eventLoop.submit(connection.wrapTask(task), result);
+ }
+
+ @Override
+ public Future submit(Callable task) {
+ return eventLoop.submit(task);
+ }
+
+ @Override
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ return eventLoop.schedule(command, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
+ return eventLoop.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return eventLoop.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return eventLoop.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public EventExecutorGroup parent() {
+ return eventLoop.parent();
+ }
+
+ @Override
+ public boolean inEventLoop() {
+ return false;
+ }
+
+ @Override
+ public boolean inEventLoop(Thread thread) {
+ return eventLoop.inEventLoop(thread);
+ }
+
+ @Override
+ public Promise newPromise() {
+ return eventLoop.newPromise();
+ }
+
+ @Override
+ public ProgressivePromise newProgressivePromise() {
+ return eventLoop.newProgressivePromise();
+ }
+
+ @Override
+ public Future newSucceededFuture(V result) {
+ return eventLoop.newSucceededFuture(result);
+ }
+
+ @Override
+ public Future newFailedFuture(Throwable cause) {
+ return eventLoop.newFailedFuture(cause);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index 959ad2694..36d810856 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -12,7 +12,6 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.codec.http.FullHttpResponse;
@@ -33,6 +32,7 @@
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCounted;
+import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.littleshoot.proxy.ActivityTracker;
@@ -910,11 +910,16 @@ private void setupConnectionParameters() throws UnknownHostException {
*/
private void initChannelPipeline(ChannelPipeline pipeline,
Channel channel) {
+
+// if (proxyServer.getGlobalStateHandler() != null) {
+// pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(clientConnection));
+// }
+
if (trafficHandler != null) {
pipeline.addLast("global-traffic-shaping", trafficHandler);
}
- EventLoopGroup globalStateWrapperEvenLoop = clientConnection.new GlobalStateWrapperEvenLoop(channel.eventLoop());
+ EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(clientConnection, channel.eventLoop());
pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
@@ -941,7 +946,11 @@ private void initChannelPipeline(ChannelPipeline pipeline,
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
- pipeline.addLast( "handler", this);
+// if (proxyServer.getGlobalStateHandler() != null) {
+// pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(clientConnection));
+// }
+
+ pipeline.addLast(globalStateWrapperEvenLoop, "handler", this);
}
/**
From c99ca94c1fe77aa510ddb14c1fadeae84d1e16f0 Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Mon, 7 Jan 2019 14:47:19 +0200
Subject: [PATCH 26/43] Remove commented code
---
.../proxy/impl/ClientToProxyConnection.java | 10 +---------
.../proxy/impl/ProxyToServerConnection.java | 10 +---------
2 files changed, 2 insertions(+), 18 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index 603b15293..48a26b69b 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -861,11 +861,7 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("requestTracerHandler", new RequestTracerHandler(this));
}
-// if (proxyServer.getGlobalStateHandler() != null) {
-// pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(this));
-// }
-
- EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this);
+ final EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this);
pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
@@ -893,10 +889,6 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
-// if (proxyServer.getGlobalStateHandler() != null) {
-// pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(this));
-// }
-
pipeline.addLast(globalStateWrapperEvenLoop, "handlerBegin", this);
pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyProcessor", new ClientPayloadProcessor());
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index 36d810856..ecbe40210 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -911,15 +911,11 @@ private void setupConnectionParameters() throws UnknownHostException {
private void initChannelPipeline(ChannelPipeline pipeline,
Channel channel) {
-// if (proxyServer.getGlobalStateHandler() != null) {
-// pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(clientConnection));
-// }
-
if (trafficHandler != null) {
pipeline.addLast("global-traffic-shaping", trafficHandler);
}
- EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(clientConnection, channel.eventLoop());
+ final EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(clientConnection, channel.eventLoop());
pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
@@ -946,10 +942,6 @@ private void initChannelPipeline(ChannelPipeline pipeline,
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
-// if (proxyServer.getGlobalStateHandler() != null) {
-// pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(clientConnection));
-// }
-
pipeline.addLast(globalStateWrapperEvenLoop, "handler", this);
}
From ca235d5c18ac08f24b3a2ab5738df037e0bdd83a Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Mon, 7 Jan 2019 20:07:33 +0200
Subject: [PATCH 27/43] Returns InboundGlobalStateHandler back
---
.../proxy/impl/ClientToProxyConnection.java | 10 +++++++++-
.../proxy/impl/GlobalStateWrapperEvenLoop.java | 8 ++------
.../proxy/impl/ProxyToServerConnection.java | 8 ++++++++
3 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index 48a26b69b..2819349d6 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -861,7 +861,11 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("requestTracerHandler", new RequestTracerHandler(this));
}
- final EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this);
+ if (proxyServer.getGlobalStateHandler() != null) {
+ pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(this));
+ }
+
+ EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this);
pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
@@ -889,6 +893,10 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
+ if (proxyServer.getGlobalStateHandler() != null) {
+ pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(this));
+ }
+
pipeline.addLast(globalStateWrapperEvenLoop, "handlerBegin", this);
pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyProcessor", new ClientPayloadProcessor());
diff --git a/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java b/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
index 211f00d4e..5276275e6 100644
--- a/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
+++ b/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
@@ -33,11 +33,7 @@ public class GlobalStateWrapperEvenLoop implements EventExecutor {
@Override
public void execute(Runnable command) {
- if (eventLoop.inEventLoop()) {
- connection.wrapTask(command).run();
- } else {
- eventLoop.execute(connection.wrapTask(command));
- }
+ eventLoop.execute(connection.wrapTask(command));
}
@Override
@@ -157,7 +153,7 @@ public EventExecutorGroup parent() {
@Override
public boolean inEventLoop() {
- return false;
+ return eventLoop.inEventLoop();
}
@Override
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index ecbe40210..77cfab055 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -911,6 +911,10 @@ private void setupConnectionParameters() throws UnknownHostException {
private void initChannelPipeline(ChannelPipeline pipeline,
Channel channel) {
+ if (proxyServer.getGlobalStateHandler() != null) {
+ pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(clientConnection));
+ }
+
if (trafficHandler != null) {
pipeline.addLast("global-traffic-shaping", trafficHandler);
}
@@ -942,6 +946,10 @@ private void initChannelPipeline(ChannelPipeline pipeline,
new IdleStateHandler(0, 0, proxyServer
.getIdleConnectionTimeout()));
+ if (proxyServer.getGlobalStateHandler() != null) {
+ pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(clientConnection));
+ }
+
pipeline.addLast(globalStateWrapperEvenLoop, "handler", this);
}
From a73d0bf51ddfbff56a253c84baf127415b4ebf91 Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Fri, 11 Jan 2019 00:31:52 +0200
Subject: [PATCH 28/43] Code refactoring
---
.../proxy/HttpProxyServerBootstrap.java | 6 +-
.../proxy/impl/ClientToProxyConnection.java | 139 +++++++++---------
.../proxy/impl/DefaultHttpProxyServer.java | 8 +-
.../impl/GlobalStateWrapperEvenLoop.java | 2 +-
.../proxy/impl/HttpInitialHandler.java | 20 +++
.../proxy/impl/ProxyConnection.java | 22 +--
.../proxy/impl/ProxyToServerConnection.java | 5 +-
.../littleshoot/proxy/impl/ServerGroup.java | 2 +-
.../proxy/impl/UpstreamConnectionHandler.java | 36 +++++
9 files changed, 141 insertions(+), 99 deletions(-)
create mode 100644 src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java
create mode 100644 src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java
diff --git a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
index 3bbc6f910..8764d3521 100644
--- a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
+++ b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
@@ -245,11 +245,11 @@ HttpProxyServerBootstrap withCustomGlobalState(
/**
*
- * @param payloadProcessorExecutor
+ * @param messageProcessorExecutor
* @return
*/
- HttpProxyServerBootstrap withPyaloadProcessorExecutor(
- ExecutorService payloadProcessorExecutor);
+ HttpProxyServerBootstrap withMessageProcessingExecutor(
+ ExecutorService messageProcessorExecutor);
/**
*
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index 2819349d6..ae7b11af4 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -3,7 +3,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
@@ -85,7 +84,6 @@
* .
*
*/
-@Sharable
public class ClientToProxyConnection extends ProxyConnection {
private static final HttpResponseStatus CONNECTION_ESTABLISHED = new HttpResponseStatus(
200, "Connection established");
@@ -143,8 +141,6 @@ public class ClientToProxyConnection extends ProxyConnection {
private AtomicBoolean authenticated = new AtomicBoolean();
- private HttpResponse clientToProxyResponse;
-
private final GlobalTrafficShapingHandler globalTrafficShapingHandler;
ClientToProxyConnection(
@@ -188,7 +184,8 @@ public void operationComplete(
**************************************************************************/
@Override
- protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, HttpRequest httpRequest) {
+ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpRequestObj) {
+ HttpRequest httpRequest = (HttpRequest) httpRequestObj;
LOG.debug("Received raw request: {}", httpRequest);
// if we cannot parse the request, immediately return a 400 and close the connection, since we do not know what state
@@ -229,11 +226,11 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, HttpRequest
* @param httpRequest
* @return
*/
- public ConnectionState doReadHTTPInitial(HttpRequest httpRequest) {
- if (clientToProxyResponse != null) {
- LOG.debug("Responding to client with short-circuit response from filter: {}", clientToProxyResponse);
+ public ConnectionState setupUpstreamConnection(HttpResponse shortCircuitResponse, HttpRequest httpRequest) {
+ if (shortCircuitResponse != null) {
+ LOG.debug("Responding to client with short-circuit response from filter: {}", shortCircuitResponse);
- boolean keepAlive = respondWithShortCircuitResponse(clientToProxyResponse);
+ boolean keepAlive = respondWithShortCircuitResponse(shortCircuitResponse);
if (keepAlive) {
return AWAITING_INITIAL;
} else {
@@ -350,25 +347,24 @@ public ConnectionState doReadHTTPInitial(HttpRequest httpRequest) {
}
@Sharable
- protected class ClientPayloadProcessor extends ChannelInboundHandlerAdapter {
+ protected class ClientToProxyMessageProcessor extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
- if (msg instanceof ReferenceCounted) {
- LOG.debug("Retaining reference counted message");
- ((ReferenceCounted) msg).retain();
- }
-
- HttpRequest httpRequest = (HttpRequest) msg;
+ if (msg instanceof ReferenceCounted) {
+ LOG.debug("Retaining reference counted message");
+ ((ReferenceCounted) msg).retain();
+ }
- if (ProxyUtils.isChunked(httpRequest)) {
- process(ctx, msg, httpRequest);
- } else {
- proxyServer.getPayloadProcessorExecutor()
- .execute(wrapTask(() -> process(ctx, msg, httpRequest)));
- }
+ final HttpRequest httpRequest = (HttpRequest) msg;
+ if (ProxyUtils.isChunked(httpRequest)) {
+ process(ctx, msg, httpRequest);
+ } else {
+ proxyServer.getPayloadProcessorExecutor()
+ .execute(wrapTask(() -> process(ctx, msg, httpRequest)));
+ }
}
private void process(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequest) {
@@ -376,60 +372,59 @@ private void process(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequ
boolean authenticationRequired = authenticationRequired(httpRequest);
if (authenticationRequired) {
- LOG.debug("Not authenticated!!");
- become(AWAITING_PROXY_AUTHENTICATION);
+ LOG.debug("Not authenticated!!");
+ become(AWAITING_PROXY_AUTHENTICATION);
} else {
-
- // Make a copy of the original request
- final HttpRequest currentRequest = copy(httpRequest);
-
- // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response
- // should not be filtered), fall back to the default no-op filter source.
- HttpFilters filterInstance;
- try {
- filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
- } finally {
- // releasing a copied http request
- if (currentRequest instanceof ReferenceCounted) {
- ((ReferenceCounted) currentRequest).release();
+ // Make a copy of the original request
+ final HttpRequest currentRequest = copy(httpRequest);
+
+ // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response
+ // should not be filtered), fall back to the default no-op filter source.
+ HttpFilters filterInstance;
+ try {
+ filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
+ } finally {
+ // releasing a copied http request
+ if (currentRequest instanceof ReferenceCounted) {
+ ((ReferenceCounted) currentRequest).release();
+ }
+ }
+ if (filterInstance != null) {
+ currentFilters = filterInstance;
+ } else {
+ currentFilters = HttpFiltersAdapter.NOOP_FILTER;
}
- }
- if (filterInstance != null) {
- currentFilters = filterInstance;
- } else {
- currentFilters = HttpFiltersAdapter.NOOP_FILTER;
- }
- // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
- clientToProxyResponse = currentFilters.clientToProxyRequest(httpRequest);
+ // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
+ final HttpResponse shortCircuitResponse = currentFilters.clientToProxyRequest(httpRequest);
- if (msg instanceof ReferenceCounted) {
- LOG.debug("Retaining reference counted message");
- ((ReferenceCounted) msg).retain();
- }
+ if (msg instanceof ReferenceCounted) {
+ LOG.debug("Retaining reference counted message");
+ ((ReferenceCounted) msg).retain();
+ }
- ctx.fireChannelRead(httpRequest);
+ ctx.fireChannelRead(new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse));
}
}
}
- public Runnable wrapTask(Runnable task) {
- return () -> {
- if (proxyServer.getGlobalStateHandler() != null) {
- try {
- proxyServer.getGlobalStateHandler().restoreFromChannel(channel);
- } finally {
- try {
- task.run();
- } finally {
- proxyServer.getGlobalStateHandler().clear();
- }
- }
- } else {
- task.run();
- }
- };
- }
+ Runnable wrapTask(Runnable task) {
+ return () -> {
+ if (proxyServer.getGlobalStateHandler() != null) {
+ try {
+ proxyServer.getGlobalStateHandler().restoreFromChannel(channel);
+ } finally {
+ try {
+ task.run();
+ } finally {
+ proxyServer.getGlobalStateHandler().clear();
+ }
+ }
+ } else {
+ task.run();
+ }
+ };
+ }
/**
* Returns true if the specified request is a request to an origin server, rather than to a proxy server. If this
@@ -897,11 +892,11 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(this));
}
- pipeline.addLast(globalStateWrapperEvenLoop, "handlerBegin", this);
-
- pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyProcessor", new ClientPayloadProcessor());
-
- pipeline.addLast(globalStateWrapperEvenLoop, "handlerEnd", this);
+ pipeline.addLast(globalStateWrapperEvenLoop, "handler", this);
+ HttpInitialHandler httpInitialHandler = new HttpInitialHandler<>(this);
+ pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", httpInitialHandler);
+ pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyMessageProcessor", new ClientToProxyMessageProcessor());
+ pipeline.addLast(globalStateWrapperEvenLoop, "upstreamConnectionHandler", new UpstreamConnectionHandler(this));
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
index 9f212c9fa..637ff0d7d 100644
--- a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
+++ b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
@@ -626,7 +626,7 @@ protected GlobalStateHandler getGlobalStateHandler() {
}
protected ExecutorService getPayloadProcessorExecutor() {
- return serverGroup.getPayloadProcessingExecutor();
+ return serverGroup.getMessageProcessingExecutor();
}
protected RequestTracer getRequestTracer() {
@@ -906,9 +906,9 @@ public HttpProxyServerBootstrap withCustomGlobalState(
}
@Override
- public HttpProxyServerBootstrap withPyaloadProcessorExecutor(
- ExecutorService payloadProcessorExecutor) {
- this.payloadProcessorExecutor = payloadProcessorExecutor;
+ public HttpProxyServerBootstrap withMessageProcessingExecutor(
+ ExecutorService messageProcessorExecutor) {
+ this.payloadProcessorExecutor = messageProcessorExecutor;
return this;
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java b/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
index 5276275e6..f3df53ed9 100644
--- a/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
+++ b/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
@@ -58,7 +58,7 @@ public Future> terminationFuture() {
@Override
public void shutdown() {
- eventLoop.terminationFuture();
+ eventLoop.shutdown();
}
@Override
diff --git a/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java b/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java
new file mode 100644
index 000000000..311e11e42
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java
@@ -0,0 +1,20 @@
+package org.littleshoot.proxy.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.HttpObject;
+
+public class HttpInitialHandler extends SimpleChannelInboundHandler {
+
+ private final ProxyConnection proxyConnection;
+
+ HttpInitialHandler(ProxyConnection proxyConnection) {
+ this.proxyConnection = proxyConnection;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+ final ConnectionState connectionState = proxyConnection.readHTTPInitial(ctx, msg);
+ proxyConnection.become(connectionState);
+ }
+}
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
index d92f1a473..c79227c1f 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
@@ -129,11 +129,10 @@ protected void read(ChannelHandlerContext ctx, Object msg) {
*/
@SuppressWarnings("unchecked")
private void readHTTP(ChannelHandlerContext ctx, HttpObject httpObject) {
- ConnectionState nextState = getCurrentState();
switch (getCurrentState()) {
case AWAITING_INITIAL:
if (httpObject instanceof HttpMessage) {
- nextState = processPayload(ctx, (I) httpObject);
+ ctx.fireChannelRead(httpObject);
} else {
// Similar to the AWAITING_PROXY_AUTHENTICATION case below, we may enter an AWAITING_INITIAL
// state if the proxy responded to an earlier request with a 502 or 504 response, or a short-circuit
@@ -145,13 +144,13 @@ private void readHTTP(ChannelHandlerContext ctx, HttpObject httpObject) {
case AWAITING_CHUNK:
HttpContent chunk = (HttpContent) httpObject;
readHTTPChunk(chunk);
- nextState = ProxyUtils.isLastChunk(chunk) ? AWAITING_INITIAL
- : AWAITING_CHUNK;
+ become(ProxyUtils.isLastChunk(chunk) ? AWAITING_INITIAL
+ : AWAITING_CHUNK);
break;
case AWAITING_PROXY_AUTHENTICATION:
if (httpObject instanceof HttpRequest) {
// Once we get an HttpRequest, try to process it as usual
- nextState = processPayload(ctx, (I) httpObject);
+ ctx.fireChannelRead(httpObject);
} else {
// Anything that's not an HttpRequest that came in while
// we're pending authentication gets dropped on the floor. This
@@ -179,17 +178,6 @@ private void readHTTP(ChannelHandlerContext ctx, HttpObject httpObject) {
LOG.info("Ignoring message since the connection is closed or about to close");
break;
}
- become(nextState);
- }
-
- private ConnectionState processPayload(ChannelHandlerContext ctx, I httpObject) {
- ConnectionState nextState;
- if (afterPayloadProcessor(ctx)) {
- nextState = ((ClientToProxyConnection) this).doReadHTTPInitial((HttpRequest) httpObject);
- } else {
- nextState = readHTTPInitial(ctx, httpObject);
- }
- return nextState;
}
private boolean afterPayloadProcessor(ChannelHandlerContext ctx) {
@@ -203,7 +191,7 @@ private boolean afterPayloadProcessor(ChannelHandlerContext ctx) {
* @param httpObject
* @return
*/
- protected abstract ConnectionState readHTTPInitial(ChannelHandlerContext ctx, I httpObject);
+ protected abstract ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpObject);
/**
* Implement this to handle reading a chunk in a chunked transfer.
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index 77cfab055..c159681f0 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -223,7 +223,8 @@ protected void read(ChannelHandlerContext ctx, Object msg) {
}
@Override
- protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, HttpResponse httpResponse) {
+ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpResponseObj) {
+ HttpResponse httpResponse = (HttpResponse) httpResponseObj;
LOG.debug("Received raw response: {}", httpResponse);
if (httpResponse.getDecoderResult().isFailure()) {
@@ -951,6 +952,8 @@ private void initChannelPipeline(ChannelPipeline pipeline,
}
pipeline.addLast(globalStateWrapperEvenLoop, "handler", this);
+ HttpInitialHandler httpInitialHandler = new HttpInitialHandler<>(this);
+ pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", httpInitialHandler);
}
/**
diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
index 144b0f75e..98ca3eeb3 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
@@ -300,7 +300,7 @@ public EventLoopGroup getProxyToServerWorkerPoolForTransport(TransportProtocol p
return getThreadPoolsForProtocol(protocol).getProxyToServerWorkerPool();
}
- public ExecutorService getPayloadProcessingExecutor() {
+ public ExecutorService getMessageProcessingExecutor() {
return payloadProcessingExecutor;
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java b/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java
new file mode 100644
index 000000000..ca59f5011
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java
@@ -0,0 +1,36 @@
+package org.littleshoot.proxy.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+
+public class UpstreamConnectionHandler extends ChannelInboundHandlerAdapter {
+
+ private final ClientToProxyConnection clientToProxyConnection;
+
+ UpstreamConnectionHandler(ClientToProxyConnection clientToProxyConnection) {
+ this.clientToProxyConnection = clientToProxyConnection;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object request) {
+ final ConnectionState connectionState =
+ clientToProxyConnection.setupUpstreamConnection(((Request)request).getShortCircuitResponse(),
+ ((Request)request).getInitialRequest());
+ clientToProxyConnection.become(connectionState);
+ }
+
+ public static class Request {
+ private final HttpRequest initialRequest;
+ private final HttpResponse shortCircuitResponse;
+
+ public Request(HttpRequest initialRequest, HttpResponse shortCircuitResponse) {
+ this.initialRequest = initialRequest;
+ this.shortCircuitResponse = shortCircuitResponse;
+ }
+
+ HttpRequest getInitialRequest() { return initialRequest; }
+ HttpResponse getShortCircuitResponse() { return shortCircuitResponse; }
+ }
+}
From 5066c8e6fd421a41b89ef3607aa24e8c40e314bb Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Fri, 11 Jan 2019 00:49:21 +0200
Subject: [PATCH 29/43] Code refactoring 2
---
.../proxy/impl/ProxyConnection.java | 4 --
.../proxy/impl/ProxyToServerConnection.java | 45 +++++++++++--------
2 files changed, 27 insertions(+), 22 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
index c79227c1f..4792daf6a 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
@@ -180,10 +180,6 @@ private void readHTTP(ChannelHandlerContext ctx, HttpObject httpObject) {
}
}
- private boolean afterPayloadProcessor(ChannelHandlerContext ctx) {
- return ctx.name().equals("handlerEnd");
- }
-
/**
* Implement this to handle reading the initial object (e.g.
* {@link HttpRequest} or {@link HttpResponse}).
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index c159681f0..7f406e599 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -12,6 +12,7 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.codec.http.FullHttpResponse;
@@ -244,24 +245,32 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object http
rememberCurrentResponse(httpResponse);
- final HttpResponse resp = httpResponse;
+ ctx.fireChannelRead(httpResponse);
- if (ProxyUtils.isChunked(httpResponse)) {
- respondWith(resp);
- return AWAITING_CHUNK;
- } else {
- if (resp instanceof ReferenceCounted) {
- LOG.debug("Retaining reference counted message");
- ((ReferenceCounted) resp).retain();
- }
+ return getCurrentState();
+ }
- proxyServer.getPayloadProcessorExecutor()
- .execute(clientConnection.wrapTask(() -> {
- respondWith(resp);
- currentFilters.serverToProxyResponseReceived();
- super.become(AWAITING_INITIAL);
- }));
- return getCurrentState();
+ public class RespondToCientHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+ final HttpResponse httpResponse = (HttpResponse) msg;
+ if (ProxyUtils.isChunked(httpResponse)) {
+ respondWith(httpResponse);
+ become(AWAITING_CHUNK);
+ } else {
+ if (httpResponse instanceof ReferenceCounted) {
+ LOG.debug("Retaining reference counted message");
+ ((ReferenceCounted) httpResponse).retain();
+ }
+
+ proxyServer.getPayloadProcessorExecutor()
+ .execute(clientConnection.wrapTask(() -> {
+ respondWith(httpResponse);
+ currentFilters.serverToProxyResponseReceived();
+ become(AWAITING_INITIAL);
+ }));
+ }
}
}
@@ -952,8 +961,8 @@ private void initChannelPipeline(ChannelPipeline pipeline,
}
pipeline.addLast(globalStateWrapperEvenLoop, "handler", this);
- HttpInitialHandler httpInitialHandler = new HttpInitialHandler<>(this);
- pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", httpInitialHandler);
+ pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", new HttpInitialHandler<>(this));
+ pipeline.addLast(globalStateWrapperEvenLoop, "respondToClientHandler", new RespondToCientHandler());
}
/**
From 4fd951cd241cd2d572b72df43b6cbcf2bf92a717 Mon Sep 17 00:00:00 2001
From: Slava Fomin
Date: Fri, 11 Jan 2019 01:23:13 +0200
Subject: [PATCH 30/43] Fix ref counting
---
.../proxy/impl/ClientToProxyConnection.java | 70 ++++++++++---------
.../proxy/impl/HttpInitialHandler.java | 6 +-
.../proxy/impl/ProxyToServerConnection.java | 4 +-
.../proxy/impl/UpstreamConnectionHandler.java | 6 +-
4 files changed, 46 insertions(+), 40 deletions(-)
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index ae7b11af4..4d567e7e7 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -351,58 +351,64 @@ protected class ClientToProxyMessageProcessor extends ChannelInboundHandlerAdapt
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ final HttpRequest httpRequest = (HttpRequest) msg;
- if (msg instanceof ReferenceCounted) {
+ if (httpRequest instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
((ReferenceCounted) msg).retain();
}
- final HttpRequest httpRequest = (HttpRequest) msg;
-
if (ProxyUtils.isChunked(httpRequest)) {
- process(ctx, msg, httpRequest);
+ process(ctx, httpRequest);
} else {
proxyServer.getPayloadProcessorExecutor()
- .execute(wrapTask(() -> process(ctx, msg, httpRequest)));
+ .execute(wrapTask(() -> process(ctx, httpRequest)));
}
}
- private void process(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequest) {
+ private void process(ChannelHandlerContext ctx, HttpRequest httpRequest) {
- boolean authenticationRequired = authenticationRequired(httpRequest);
+ boolean authenticationRequired = false;
+ HttpResponse shortCircuitResponse = null;
+ try {
+ authenticationRequired = authenticationRequired(httpRequest);
- if (authenticationRequired) {
- LOG.debug("Not authenticated!!");
- become(AWAITING_PROXY_AUTHENTICATION);
- } else {
- // Make a copy of the original request
- final HttpRequest currentRequest = copy(httpRequest);
+ if (authenticationRequired) {
+ LOG.debug("Not authenticated!!");
+ become(AWAITING_PROXY_AUTHENTICATION);
+ } else {
+ // Make a copy of the original request
+ final HttpRequest currentRequest = copy(httpRequest);
- // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response
- // should not be filtered), fall back to the default no-op filter source.
- HttpFilters filterInstance;
- try {
- filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
- } finally {
- // releasing a copied http request
- if (currentRequest instanceof ReferenceCounted) {
- ((ReferenceCounted) currentRequest).release();
+ // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response
+ // should not be filtered), fall back to the default no-op filter source.
+ HttpFilters filterInstance;
+ try {
+ filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
+ } finally {
+ // releasing a copied http request
+ if (currentRequest instanceof ReferenceCounted) {
+ ((ReferenceCounted) currentRequest).release();
+ }
+ }
+ if (filterInstance != null) {
+ currentFilters = filterInstance;
+ } else {
+ currentFilters = HttpFiltersAdapter.NOOP_FILTER;
}
- }
- if (filterInstance != null) {
- currentFilters = filterInstance;
- } else {
- currentFilters = HttpFiltersAdapter.NOOP_FILTER;
- }
- // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
- final HttpResponse shortCircuitResponse = currentFilters.clientToProxyRequest(httpRequest);
+ // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
+ shortCircuitResponse = currentFilters.clientToProxyRequest(httpRequest);
- if (msg instanceof ReferenceCounted) {
+ }
+ } catch (Exception e) {
+ if (httpRequest instanceof ReferenceCounted) {
LOG.debug("Retaining reference counted message");
- ((ReferenceCounted) msg).retain();
+ ((ReferenceCounted) httpRequest).release();
}
+ }
+ if (!authenticationRequired) {
ctx.fireChannelRead(new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse));
}
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java b/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java
index 311e11e42..58f4d9dcf 100644
--- a/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java
+++ b/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java
@@ -1,10 +1,10 @@
package org.littleshoot.proxy.impl;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpObject;
-public class HttpInitialHandler extends SimpleChannelInboundHandler {
+public class HttpInitialHandler extends ChannelInboundHandlerAdapter {
private final ProxyConnection proxyConnection;
@@ -13,7 +13,7 @@ public class HttpInitialHandler extends SimpleChannelInbou
}
@Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
final ConnectionState connectionState = proxyConnection.readHTTPInitial(ctx, msg);
proxyConnection.become(connectionState);
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index 7f406e599..fd6404902 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -250,7 +250,7 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object http
return getCurrentState();
}
- public class RespondToCientHandler extends SimpleChannelInboundHandler {
+ public class RespondToClientHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
@@ -962,7 +962,7 @@ private void initChannelPipeline(ChannelPipeline pipeline,
pipeline.addLast(globalStateWrapperEvenLoop, "handler", this);
pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", new HttpInitialHandler<>(this));
- pipeline.addLast(globalStateWrapperEvenLoop, "respondToClientHandler", new RespondToCientHandler());
+ pipeline.addLast(globalStateWrapperEvenLoop, "respondToClientHandler", new RespondToClientHandler());
}
/**
diff --git a/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java b/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java
index ca59f5011..f366caaf5 100644
--- a/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java
+++ b/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java
@@ -1,11 +1,11 @@
package org.littleshoot.proxy.impl;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
-public class UpstreamConnectionHandler extends ChannelInboundHandlerAdapter {
+public class UpstreamConnectionHandler extends SimpleChannelInboundHandler