-
Notifications
You must be signed in to change notification settings - Fork 15
Async message processing logic #59
Changes from all commits
a83d24b
8155561
02fc440
ad56e61
58fc61d
dab7744
e291eae
de5ac21
ac1ea26
f38c65b
f2c7ef2
6d58bc3
c9e9958
81efa84
bb1e3e9
e9cadd7
083a300
087d4ae
cddbc1e
35346f9
beee58a
44a3278
006935e
bfe4ee1
7cf4313
c99ca94
ca235d5
a73d0bf
5066c8e
4fd951c
aed95cf
a0636c6
62156df
0c9757a
c841821
cb07871
7f33692
33ec33e
adea07c
32b7572
9d700f4
b28a17e
ed3d90a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,8 @@ | |
| import io.netty.buffer.ByteBuf; | ||
| import io.netty.buffer.Unpooled; | ||
| import io.netty.channel.Channel; | ||
| import io.netty.channel.ChannelHandlerContext; | ||
| import io.netty.channel.ChannelInboundHandlerAdapter; | ||
| import io.netty.channel.ChannelPipeline; | ||
| import io.netty.handler.codec.http.DefaultHttpRequest; | ||
| import io.netty.handler.codec.http.FullHttpRequest; | ||
|
|
@@ -21,13 +23,16 @@ | |
| import io.netty.handler.timeout.IdleStateHandler; | ||
| import io.netty.handler.traffic.GlobalTrafficShapingHandler; | ||
| import io.netty.util.ReferenceCounted; | ||
| import io.netty.util.concurrent.EventExecutorGroup; | ||
| import io.netty.util.concurrent.Future; | ||
| import io.netty.util.concurrent.GenericFutureListener; | ||
|
|
||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.littleshoot.proxy.ActivityTracker; | ||
| import org.littleshoot.proxy.DefaultFailureHttpResponseComposer; | ||
| import org.littleshoot.proxy.ExceptionHandler; | ||
| import org.littleshoot.proxy.FailureHttpResponseComposer; | ||
| import org.littleshoot.proxy.GlobalStateHandler; | ||
| import org.littleshoot.proxy.ratelimit.RateLimiter; | ||
| import org.littleshoot.proxy.FlowContext; | ||
| import org.littleshoot.proxy.FullFlowContext; | ||
|
|
@@ -44,6 +49,7 @@ | |
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
|
|
@@ -143,15 +149,17 @@ public class ClientToProxyConnection extends ProxyConnection<HttpRequest> { | |
| final DefaultHttpProxyServer proxyServer, | ||
| SslEngineSource sslEngineSource, | ||
| boolean authenticateClients, | ||
| ChannelPipeline pipeline, | ||
| GlobalTrafficShapingHandler globalTrafficShapingHandler) { | ||
| GlobalTrafficShapingHandler globalTrafficShapingHandler, | ||
| Channel channel) { | ||
|
osklyarenko marked this conversation as resolved.
|
||
| super(AWAITING_INITIAL, proxyServer, false); | ||
|
|
||
| initChannelPipeline(pipeline); | ||
| this.channel = channel; | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. initial the channel is assigned on |
||
|
|
||
| initChannelPipeline(channel.pipeline()); | ||
|
|
||
| if (sslEngineSource != null) { | ||
| LOG.debug("Enabling encryption of traffic from client to proxy"); | ||
| encrypt(pipeline, sslEngineSource.newSslEngine(), | ||
| encrypt(channel.pipeline(), sslEngineSource.newSslEngine(), | ||
| authenticateClients) | ||
| .addListener( | ||
| new GenericFutureListener<Future<? super Channel>>() { | ||
|
|
@@ -177,7 +185,8 @@ public void operationComplete( | |
| **************************************************************************/ | ||
|
|
||
| @Override | ||
| protected ConnectionState readHTTPInitial(HttpRequest httpRequest) { | ||
| protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpRequestObj) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why did we change
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, I had some weird behavior with generic here.. |
||
| HttpRequest httpRequest = (HttpRequest) httpRequestObj; | ||
| LOG.debug("Received raw request: {}", httpRequest); | ||
|
|
||
| // if we cannot parse the request, immediately return a 400 and close the connection, since we do not know what state | ||
|
|
@@ -195,14 +204,9 @@ protected ConnectionState readHTTPInitial(HttpRequest httpRequest) { | |
| return DISCONNECT_REQUESTED; | ||
| } | ||
|
|
||
| boolean authenticationRequired = authenticationRequired(httpRequest); | ||
| ctx.fireChannelRead(httpRequest); | ||
|
|
||
| if (authenticationRequired) { | ||
| LOG.debug("Not authenticated!!"); | ||
| return AWAITING_PROXY_AUTHENTICATION; | ||
| } else { | ||
| return doReadHTTPInitial(httpRequest); | ||
| } | ||
| return getCurrentState(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -223,34 +227,11 @@ protected ConnectionState readHTTPInitial(HttpRequest httpRequest) { | |
| * @param httpRequest | ||
| * @return | ||
| */ | ||
| private ConnectionState doReadHTTPInitial(HttpRequest httpRequest) { | ||
| // Make a copy of the original request | ||
| final HttpRequest currentRequest = copy(httpRequest); | ||
|
|
||
| // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response | ||
| // should not be filtered), fall back to the default no-op filter source. | ||
| HttpFilters filterInstance; | ||
| try { | ||
| filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx); | ||
| } finally { | ||
| // releasing a copied http request | ||
| if (currentRequest instanceof ReferenceCounted) { | ||
| ((ReferenceCounted)currentRequest).release(); | ||
| } | ||
| } | ||
| if (filterInstance != null) { | ||
| currentFilters = filterInstance; | ||
| } else { | ||
| currentFilters = HttpFiltersAdapter.NOOP_FILTER; | ||
| } | ||
|
|
||
| // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required | ||
| HttpResponse clientToProxyFilterResponse = currentFilters.clientToProxyRequest(httpRequest); | ||
| public ConnectionState setupUpstreamConnection(HttpResponse shortCircuitResponse, HttpRequest httpRequest) { | ||
| if (shortCircuitResponse != null) { | ||
| LOG.debug("Responding to client with short-circuit response from filter: {}", shortCircuitResponse); | ||
|
|
||
| if (clientToProxyFilterResponse != null) { | ||
| LOG.debug("Responding to client with short-circuit response from filter: {}", clientToProxyFilterResponse); | ||
|
|
||
| boolean keepAlive = respondWithShortCircuitResponse(clientToProxyFilterResponse); | ||
| boolean keepAlive = respondWithShortCircuitResponse(shortCircuitResponse); | ||
| if (keepAlive) { | ||
| return AWAITING_INITIAL; | ||
| } else { | ||
|
|
@@ -366,6 +347,96 @@ private ConnectionState doReadHTTPInitial(HttpRequest httpRequest) { | |
| } | ||
| } | ||
|
|
||
| @Sharable | ||
| protected class ClientToProxyMessageProcessor extends ChannelInboundHandlerAdapter { | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this handler contains authentication and payload transformation logic which is executed in a separate executor service There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we move it to a separate class / java file?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this class encapsulate some code of client to proxy connection.. it is easier to have it here.. |
||
|
|
||
| @Override | ||
| public void channelRead(ChannelHandlerContext ctx, Object msg) { | ||
| final HttpRequest httpRequest = (HttpRequest) msg; | ||
|
|
||
| if (ProxyUtils.isChunked(httpRequest)) { | ||
| process(ctx, httpRequest); | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can not do async call when chunks are used. It is due to https://github.com/netty/netty/blob/6fdd7fcddbe964b2f30d7492a926f4f0bf0f083f/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java#L321. The app does not use chunks as they are aggregated by https://github.com/verygoodsecurity/LittleProxy/blob/vgs-edition/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java#L817 |
||
| } else { | ||
| if (httpRequest instanceof ReferenceCounted) { | ||
| LOG.debug("Retaining reference counted message"); | ||
| ((ReferenceCounted) msg).retain(); | ||
| } | ||
|
|
||
| proxyServer.getMessageProcessingExecutor() | ||
| .execute(wrapTask(() -> { | ||
| try { | ||
| process(ctx, httpRequest); | ||
| } catch (Exception e) { | ||
| ctx.fireExceptionCaught(e); | ||
| } finally { | ||
| if (httpRequest instanceof ReferenceCounted) { | ||
| LOG.debug("Retaining reference counted message"); | ||
| ((ReferenceCounted) httpRequest).release(); | ||
| } | ||
| } | ||
| })); | ||
| } | ||
| } | ||
|
|
||
| private void process(ChannelHandlerContext ctx, HttpRequest httpRequest) { | ||
|
|
||
| boolean authenticationRequired = false; | ||
| HttpResponse shortCircuitResponse = null; | ||
| authenticationRequired = authenticationRequired(httpRequest); | ||
|
|
||
| if (authenticationRequired) { | ||
| LOG.debug("Not authenticated!!"); | ||
| become(AWAITING_PROXY_AUTHENTICATION); | ||
| } else { | ||
| // Make a copy of the original request | ||
| final HttpRequest currentRequest = copy(httpRequest); | ||
|
|
||
| // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response | ||
| // should not be filtered), fall back to the default no-op filter source. | ||
| HttpFilters filterInstance; | ||
| try { | ||
| filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx); | ||
| } finally { | ||
| // releasing a copied http request | ||
| if (currentRequest instanceof ReferenceCounted) { | ||
| ((ReferenceCounted) currentRequest).release(); | ||
| } | ||
| } | ||
| if (filterInstance != null) { | ||
| currentFilters = filterInstance; | ||
| } else { | ||
| currentFilters = HttpFiltersAdapter.NOOP_FILTER; | ||
| } | ||
|
|
||
| // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required | ||
| shortCircuitResponse = currentFilters.clientToProxyRequest(httpRequest); | ||
|
|
||
| } | ||
|
|
||
| if (!authenticationRequired) { | ||
| if (httpRequest instanceof ReferenceCounted) { | ||
| LOG.debug("Retaining reference counted message"); | ||
| ((ReferenceCounted) httpRequest).retain(); | ||
| } | ||
|
|
||
| ctx.fireChannelRead(new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Runnable wrapTask(Runnable task) { | ||
| return () -> { | ||
| final Optional<GlobalStateHandler> globalStateHandler = | ||
| Optional.ofNullable(proxyServer.getGlobalStateHandler()); | ||
| try { | ||
| globalStateHandler.ifPresent(it -> it.restoreFromChannel(channel)); | ||
| task.run(); | ||
|
osklyarenko marked this conversation as resolved.
|
||
| } finally { | ||
| globalStateHandler.ifPresent(GlobalStateHandler::clear); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Returns true if the specified request is a request to an origin server, rather than to a proxy server. If this | ||
| * request is being MITM'd, this method always returns false. The format of requests to a proxy server are defined | ||
|
|
@@ -800,8 +871,10 @@ private void initChannelPipeline(ChannelPipeline pipeline) { | |
| pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(this)); | ||
| } | ||
|
|
||
| pipeline.addLast("bytesReadMonitor", bytesReadMonitor); | ||
| pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor); | ||
| EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this); | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this event loop will not always be used. Only when a call to The reason is that in the same loop |
||
|
|
||
| pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor); | ||
| pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor); | ||
|
|
||
| pipeline.addLast("encoder", new HttpResponseEncoder()); | ||
| // We want to allow longer request lines, headers, and chunks | ||
|
|
@@ -818,8 +891,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) { | |
| aggregateContentForFiltering(pipeline, numberOfBytesToBuffer); | ||
| } | ||
|
|
||
| pipeline.addLast("requestReadMonitor", requestReadMonitor); | ||
| pipeline.addLast("responseWrittenMonitor", responseWrittenMonitor); | ||
| pipeline.addLast(globalStateWrapperEvenLoop, "requestReadMonitor", requestReadMonitor); | ||
| pipeline.addLast(globalStateWrapperEvenLoop, "responseWrittenMonitor", responseWrittenMonitor); | ||
|
|
||
| pipeline.addLast( | ||
| "idle", | ||
|
|
@@ -830,7 +903,10 @@ private void initChannelPipeline(ChannelPipeline pipeline) { | |
| pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(this)); | ||
| } | ||
|
|
||
| pipeline.addLast("handler", this); | ||
| pipeline.addLast(globalStateWrapperEvenLoop, "router", this); | ||
| pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", new HttpInitialHandler<>(this)); | ||
| pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyMessageProcessor", new ClientToProxyMessageProcessor()); | ||
| pipeline.addLast(globalStateWrapperEvenLoop, "upstreamConnectionHandler", new UpstreamConnectionHandler(this)); | ||
|
|
||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be provided from outside, possibly in a wrapper with added metrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i like the idea of a specialized wrapper.